����JFIFXX�����    $.' ",#(7),01444'9=82<.342  2!!22222222222222222222222222222222222222222222222222����"��4�� ���,�PG"Z_�4�˷����kjز�Z�,F+��_z�,�© �����zh6�٨�ic�fu���#ډb���_�N�?��wQ���5-�~�I���8����TK<5o�Iv-�����k�_U_�����~b�M��d����Ӝ�U�Hh��?]��E�w��Q���k�{��_}qFW7HTՑ��Y��F�?_�'ϔ��_�Ջt��=||I ��6�έ"�����D���/[�k�9���Y�8ds|\���Ҿp6�Ҵ���]��.����6�z<�v��@]�i%��$j��~�g��J>��no����pM[me�i$[����s�o�ᘨ�˸ nɜG-�ĨU�ycP�3.DB�li�;��hj���x7Z^�N�h������N3u{�:j�x�힞��#M&��jL P@_���� P��&��o8������9�����@Sz6�t7#O�ߋ �s}Yf�T���lmr����Z)'N��k�۞p����w\�Tȯ?�8`�O��i{wﭹW�[�r�� ��Q4F�׊���3m&L�=��h3����z~��#�\�l :�F,j@�� ʱ�wQT����8�"kJO���6�֚l����}���R�>ډK���]��y����&����p�}b��;N�1�m�r$�|��7�>e�@B�TM*-iH��g�D�)� E�m�|�ؘbҗ�a��Ҿ����t4���o���G��*oCN�rP���Q��@z,|?W[0�����:�n,jWiE��W��$~/�hp\��?��{(�0���+�Y8rΟ�+����>S-S����VN;�}�s?.����� w�9��˟<���Mq4�Wv'��{)0�1mB��V����W[�����8�/<� �%���wT^�5���b��)iM� pg�N�&ݝ��VO~�q���u���9� ����!��J27����$O-���! �:�%H��� ـ����y�ΠM=t{!S�� oK8������t<����è:a������[�����ա�H���~��w��Qz`�po�^ ����Q��n� �,uu�C�$ ^���,������8�#��:�6��e�|~���!�3�3.�\0��q��o�4`.|� ����y�Q�`~;�d�ׯ,��O�Zw�������`73�v�܋�<���Ȏ�� ـ4k��5�K�a�u�=9Yd��$>x�A�&�� j0� ���vF��� Y�|�y��� ~�6�@c��1vOp�Ig����4��l�OD���L����� R���c���j�_�uX6��3?nk��Wy�f;^*B� ��@�~a�`��Eu������+���6�L��.ü>��}y���}_�O�6�͐�:�YrG�X��kG�����l^w���~㒶sy��Iu�!� W ��X��N�7BV��O��!X�2����wvG�R�f�T#�����t�/?���%8�^�W�aT��G�cL�M���I��(J����1~�8�?aT ���]����AS�E��(��*E}� 2��#I/�׍qz��^t�̔���b�Yz4x���t�){ OH��+(E��A&�N�������XT��o��"�XC��'���)}�J�z�p� ��~5�}�^����+�6����w��c��Q�|Lp�d�H��}�(�.|����k��c4^�"�����Z?ȕ ��a<�L�!039C� �Eu�C�F�Ew�ç ;�n?�*o���B�8�bʝ���'#Rqf���M}7����]����s2tcS{�\icTx;�\��7K���P���ʇ Z O-��~��c>"��?�������P��E��O�8��@�8��G��Q�g�a�Վ���󁶠�䧘��_%#r�>�1�z�a��eb��qcPѵ��n���#L��� =��׀t� L�7�`��V���A{�C:�g���e@�w1 Xp3�c3�ġ����p��M"'-�@n4���fG��B3�DJ�8[Jo�ߐ���gK)ƛ��$���� ���8�3�����+���� �����6�ʻ���� ���S�kI�*KZlT _`���?��K����QK�d����B`�s}�>���`��*�>��,*@J�d�oF*����弝��O}�k��s��]��y�ߘ��c1G�V���<=�7��7����6�q�PT��tXԀ�!9*4�4Tހ3XΛex�46���Y��D ����� �BdemDa����\�_l,��G�/���֌7���Y�](�xTt^%�GE�����4�}bT���ڹ�����;Y)���B�Q��u��>J/J �⮶.�XԄ��j�ݳ�+E��d ��r�5�_D�1 ��o�� �B�x�΢�#���<��W�����8���R6�@g�M�.��� dr�D��>(otU��@x=��~v���2� ӣ�d�oBd��3�eO�6�㣷�����ݜ6��6Y��Qz`��S��{���\P�~z m5{J/L��1������<�e�ͅPu�b�]�ϔ���'������f�b� Zpw��c`"��i���BD@:)ִ�:�]��hv�E�w���T�l��P���"Ju�}��وV J��G6��. J/�Qgl߭�e�����@�z�Zev2u�)]կ�����7x���s�M�-<ɯ�c��r�v�����@��$�ޮ}lk���a���'����>x��O\�ZFu>�����ck#��&:��`�$�ai�>2Δ����l���oF[h��lE�ܺ�Πk:)���`�� $[6�����9�����kOw�\|���8}������ބ:��񶐕��I�A1/�=�2[�,�!��.}gN#�u����b��� ~��݊��}34q����d�E��Lc��$��"�[q�U�硬g^��%B �z���r�pJ�ru%v\h1Y�ne`ǥ:g���pQM~�^�Xi� ��`S�:V29.�P���V�?B�k�� AEvw%�_�9C�Q����wKekPؠ�\�;Io d�{ ߞo�c1eP����\� `����E=���@K<�Y���eڼ�J���w����{av�F�'�M�@/J��+9p���|]�����Iw &`��8���&M�hg��[�{��Xj��%��Ӓ�$��(����ʹN���<>�I���RY���K2�NPlL�ɀ)��&e����B+ь����( � �JTx���_?EZ� }@ 6�U���뙢ط�z��dWI�n` D����噥�[��uV��"�G&Ú����2g�}&m��?ċ�"����Om#��������� ��{�ON��"S�X��Ne��ysQ���@Fn��Vg���dX�~nj�]J�<�K]:��FW��b�������62�=��5f����JKw��bf�X�55��~J �%^����:�-�QIE��P��v�nZum� z � ~ə ���� ���ة����;�f��\v���g�8�1��f24;�V���ǔ�)����9���1\��c��v�/'Ƞ�w�������$�4�R-��t���� e�6�/�ġ �̕Ecy�J���u�B���<�W�ַ~�w[B1L۲�-JS΂�{���΃������A��20�c#��@ 0!1@AP"#2Q`$3V�%45a6�FRUq��� ����^7ׅ,$n�������+��F�`��2X'��0vM��p�L=������5��8������u�p~���.�`r�����\���O��,ư�0oS ��_�M�����l���4�kv\JSd���x���SW�<��Ae�IX����������$I���w�:S���y���›R��9�Q[���,�5�;�@]�%���u�@ *ro�lbI �� ��+���%m:�͇ZV�����u�̉����θau<�fc�.����{�4Ա� �Q����*�Sm��8\ujqs]{kN���)qO�y�_*dJ�b�7���yQqI&9�ԌK!�M}�R�;������S�T���1���i[U�ɵz�]��U)V�S6���3$K{�ߊ<�(� E]Զ[ǼENg�����'�\?#)Dkf��J���o��v���'�%ƞ�&K�u�!��b�35LX�Ϸ��63$K�a�;�9>,R��W��3�3� d�JeTYE.Mϧ��-�o�j3+y��y^�c�������VO�9NV\nd�1 ��!͕_)a�v;����թ�M�lWR1��)El��P;��yوÏ�u 3�k�5Pr6<�⒲l�!˞*��u־�n�!�l:����UNW ��%��Chx8vL'��X�@��*��)���̮��ˍ��� ���D-M�+J�U�kvK����+�x8��cY������?�Ԡ��~3mo��|�u@[XeY�C�\Kp�x8�oC�C�&����N�~3-H���� ��MX�s�u<`���~"WL��$8ξ��3���a�)|:@�m�\���^�`�@ҷ)�5p+��6���p�%i)P M���ngc�����#0Aruz���RL+xSS?���ʮ}()#�t��mˇ!��0}}y����<�e� �-ή�Ԩ��X������ MF���ԙ~l L.3���}�V뽺�v�����멬��Nl�)�2����^�Iq��a��M��qG��T�����c3#������3U�Ǎ���}��לS�|qa��ڃ�+���-��2�f����/��bz��ڐ�� �ݼ[2�ç����k�X�2�* �Z�d���J�G����M*9W���s{��w���T��x��y,�in�O�v��]���n����P�$�JB@=4�OTI�n��e�22a\����q�d���%�$��(���:���: /*�K[PR�fr\nڙdN���F�n�$�4�[�� U�zƶ����� �mʋ���,�ao�u 3�z� �x��Kn����\[��VFmbE;�_U��&V�Gg�]L�۪&#n%�$ɯ�dG���D�TI=�%+AB�Ru#��b4�1�»x�cs�YzڙJG��f��Il��d�eF'T� iA��T���uC�$����Y��H?����[!G`}���ͪ� �纤Hv\������j�Ex�K���!���OiƸ�Yj�+u-<���'q����uN�*�r\��+�]���<�wOZ.fp�ێ��,-*)V?j-kÊ#�`�r��dV����(�ݽBk�����G�ƛk�QmUڗe��Z���f}|����8�8��a���i��3'J�����~G_�^���d�8w������ R�`(�~�.��u���l�s+g�bv���W���lGc}��u���afE~1�Ue������Z�0�8�=e�� f@/�jqEKQQ�J��oN��J���W5~M>$6�Lt�;$ʳ{���^��6�{����v6���ķܰg�V�cnn �~z�x�«�,2�u�?cE+Ș�H؎�%�Za�)���X>uW�Tz�Nyo����s���FQƤ��$��*�&�LLXL)�1�" L��eO��ɟ�9=���:t��Z���c��Ž���Y?�ӭV�wv�~,Y��r�ۗ�|�y��GaF�����C�����.�+� ���v1���fήJ�����]�S��T��B��n5sW}y�$��~z�'�c ��8 ��� ,! �p��VN�S��N�N�q��y8z˱�A��4��*��'������2n<�s���^ǧ˭P�Jޮɏ�U�G�L�J�*#��<�V��t7�8����TĜ>��i}K%,���)[��z�21z ?�N�i�n1?T�I�R#��m-�����������������1����lA�`��fT5+��ܐ�c�q՝��ʐ��,���3�f2U�եmab��#ŠdQ�y>\��)�SLY����w#��.���ʑ�f��� ,"+�w�~�N�'�c�O�3F�������N<���)j��&��,-� �љ���֊�_�zS���TǦ����w�>��?�������n��U仆�V���e�����0���$�C�d���rP �m�׈e�Xm�Vu� �L��.�bֹ��� �[Դaզ���*��\y�8�Է:�Ez\�0�Kq�C b��̘��cө���Q��=0Y��s�N��S.���3.���O�o:���#���v7�[#߫ ��5�܎�L���Er4���9n��COWlG�^��0k�%<���ZB���aB_���������'=��{i�v�l�$�uC���mƎҝ{�c㱼�y]���W�i ��ߧc��m�H� m�"�"�����;Y�ߝ�Z�Ǔ�����:S#��|}�y�,/k�Ld� TA�(�AI$+I3��;Y*���Z��}|��ӧO��d�v��..#:n��f>�>���ȶI�TX��� 8��y����"d�R�|�)0���=���n4��6ⲑ�+��r<�O�܂~zh�z����7ܓ�HH�Ga롏���nCo�>������a ���~]���R���̲c?�6(�q�;5%� |�uj�~z8R=X��I�V=�|{v�Gj\gc��q����z�؋%M�ߍ����1y��#��@f^���^�>N�����#x#۹��6�Y~�?�dfPO��{��P�4��V��u1E1J �*|���%���JN��`eWu�zk M6���q t[�� ��g�G���v��WIG��u_ft����5�j�"�Y�:T��ɐ���*�;� e5���4����q$C��2d�}���� _S�L#m�Yp��O�.�C�;��c����Hi#֩%+) �Ӎ��ƲV���SYź��g |���tj��3�8���r|���V��1#;.SQ�A[���S������#���`n�+���$��$I �P\[�@�s��(�ED�z���P��])8�G#��0B��[ى��X�II�q<��9�~[Z멜�Z�⊔IWU&A>�P~�#��dp<�?����7���c��'~���5 ��+$���lx@�M�dm��n<=e�dyX��?{�|Aef ,|n3�<~z�ƃ�uۧ�����P��Y,�ӥQ�*g�#먙R�\���;T��i,��[9Qi歉����c>]9�� ��"�c��P�� �Md?٥��If�ت�u��k��/����F��9�c*9��Ǎ:�ØF���z�n*�@|I�ށ9����N3{'��[�'ͬ�Ҳ4��#}��!�V� Fu��,�,mTIk���v C�7v���B�6k�T9��1�*l� '~��ƞF��lU��'�M ����][ΩũJ_�{�i�I�n��$���L�� j��O�dx�����kza۪��#�E��Cl����x˘�o�����V���ɞ�ljr��)�/,�߬h�L��#��^��L�ф�,íMƁe�̩�NB�L�����iL����q�}��(��q��6IçJ$�W�E$��:������=#����(�K�B����zђ <��K(�N�۫K�w��^O{!����)�H���>x�������lx�?>Պ�+�>�W���,Ly!_�D���Ō�l���Q�!�[ �S����J��1��Ɛ�Y}��b,+�Lo�x�ɓ)����=�y�oh�@�꥟/��I��ѭ=��P�y9��� �ۍYӘ�e+�p�Jnϱ?V\SO%�(�t� ���=?MR�[Ș�����d�/ ��n�l��B�7j� ��!�;ӥ�/�[-���A�>�dN�sLj ��,ɪv��=1c�.SQ�O3�U���ƀ�ܽ�E����������̻��9G�ϷD�7(�}��Ävӌ\�y�_0[w ���<΍>����a_��[0+�L��F.�޺��f�>oN�T����q;���y\��bՃ��y�jH�<|q-eɏ�_?_9+P���Hp$�����[ux�K w�Mw��N�ی'$Y2�=��q���KB��P��~������Yul:�[<����F1�2�O���5=d����]Y�sw:���Ϯ���E��j,_Q��X��z`H1,#II ��d�wr��P˂@�ZJV����y$�\y�{}��^~���[:N����ߌ�U�������O��d�����ؾe��${p>G��3c���Ė�lʌ�� ת��[��`ϱ�-W����dg�I��ig2��� ��}s ��ؤ(%#sS@���~���3�X�nRG�~\jc3�v��ӍL��M[JB�T��s3}��j�Nʖ��W����;7��ç?=X�F=-�=����q�ߚ���#���='�c��7���ڑW�I(O+=:uxq�������������e2�zi+�kuG�R��������0�&e�n���iT^J����~\jy���p'dtG��s����O��3����9* �b#Ɋ�� p������[Bws�T�>d4�ۧs���nv�n���U���_�~,�v����ƜJ1��s�� �QIz��)�(lv8M���U=�;����56��G���s#�K���MP�=��LvyGd��}�VwWBF�'�à �?MH�U�g2�� ����!�p�7Q��j��ڴ����=��j�u��� Jn�A s���uM������e��Ɔ�Ҕ�!)'��8Ϣ�ٔ��ޝ(��Vp���צ֖d=�IC�J�Ǡ{q������kԭ�߸���i��@K����u�|�p=..�*+����x�����z[Aqġ#s2a�Ɗ���RR�)*HRsi�~�a &f��M��P����-K�L@��Z��Xy�'x�{}��Zm+���:�)�) IJ�-i�u���� ���ܒH��'�L(7�y�GӜq���� j��� 6ߌg1�g�o���,kر���tY�?W,���p���e���f�OQS��!K�۟cҒA�|ս�j�>��=⬒��˧L[�� �߿2JaB~R��u�:��Q�] �0H~���]�7��Ƽ�I���(}��cq '�ήET���q�?f�ab���ӥvr� �)o��-Q��_'����ᴎo��K������;��V���o��%���~OK ����*��b�f:���-ťIR��`B�5!RB@���ï�� �u �̯e\�_U�_������� g�ES��3�������QT��a����x����U<~�c?�*�#]�MW,[8O�a�x��]�1bC|踤�P��lw5V%�)�{t�<��d��5���0i�XSU��m:��Z�┵�i�"��1�^B�-��P�hJ��&)O��*�D��c�W��vM��)����}���P��ܗ-q����\mmζZ-l@�}��a��E�6��F�@��&Sg@���ݚ�M����� ȹ 4����#p�\H����dYDo�H���"��\��..R�B�H�z_�/5˘����6��KhJR��P�mƶi�m���3�,#c�co��q�a)*Pt����R�m�k�7x�D�E�\Y�閣_X�<���~�)���c[[�BP����6�Yq���S��0����%_����;��Àv�~�| VS؇ ��'O0��F0��\���U�-�d@�����7�SJ*z��3n��y��P����O���������m�~�P�3|Y��ʉr#�C�<�G~�.,! ���bqx���h~0=��!ǫ�jy����l�O,�[B��~��|9��ٱ����Xly�#�i�B��g%�S��������tˋ���e���ې��\[d�t)��.+u�|1 ������#�~Oj����hS�%��i.�~X���I�H�m��0n���c�1uE�q��cF�RF�o���7� �O�ꮧ� ���ۛ{��ʛi5�rw?׌#Qn�TW��~?y$��m\�\o����%W� ?=>S�N@�� �Ʈ���R����N�)�r"C�:��:����� �����#��qb��Y�. �6[��2K����2u�Ǧ�HYR��Q�MV��� �G�$��Q+.>�����nNH��q�^��� ����q��mM��V��D�+�-�#*�U�̒ ���p욳��u:�������IB���m���PV@O���r[b= �� ��1U�E��_Nm�yKbN�O���U�}�the�`�|6֮P>�\2�P�V���I�D�i�P�O;�9�r�mAHG�W�S]��J*�_�G��+kP�2����Ka�Z���H�'K�x�W�MZ%�O�YD�Rc+o��?�q��Ghm��d�S�oh�\�D�|:W������UA�Qc yT�q������~^�H��/��#p�CZ���T�I�1�ӏT����4��"�ČZ�����}��`w�#�*,ʹ�� ��0�i��課�Om�*�da��^gJ݅{���l�e9uF#T�ֲ��̲�ٞC"�q���ߍ ոޑ�o#�XZTp����@ o�8��(jd��xw�]�,f���`~�|,s��^����f�1���t��|��m�򸄭/ctr��5s��7�9Q�4�H1꠲BB@l9@���C�����+�wp�xu�£Yc�9��?`@#�o�mH�s2��)�=��2�.�l����jg�9$�Y�S�%*L������R�Y������7Z���,*=�䷘$�������arm�o�ϰ���UW.|�r�uf����IGw�t����Zwo��~5 ��YյhO+=8fF�)�W�7�L9lM�̘·Y���֘YLf�큹�pRF���99.A �"wz��=E\Z���'a� 2��Ǚ�#;�'}�G���*��l��^"q��+2FQ� hj��kŦ��${���ޮ-�T�٭cf�|�3#~�RJ����t��$b�(R��(����r���dx� >U b�&9,>���%E\� Ά�e�$��'�q't��*�א���ެ�b��-|d���SB�O�O��$�R+�H�)�܎�K��1m`;�J�2�Y~9��O�g8=vqD`K[�F)k�[���1m޼c��n���]s�k�z$@��)!I �x՝"v��9=�ZA=`Ɠi �:�E��)`7��vI��}d�YI�_ �o�:ob���o ���3Q��&D&�2=�� �Ά��;>�h����y.*ⅥS������Ӭ�+q&����j|UƧ����}���J0��WW< ۋS�)jQR�j���Ư��rN)�Gű�4Ѷ(�S)Ǣ�8��i��W52���No˓� ۍ%�5brOn�L�;�n��\G����=�^U�dI���8$�&���h��'���+�(������cȁ߫k�l��S^���cƗjԌE�ꭔ��gF���Ȓ��@���}O���*;e�v�WV���YJ\�]X'5��ղ�k�F��b 6R�o՜m��i N�i����>J����?��lPm�U��}>_Z&�KK��q�r��I�D�Չ~�q�3fL�:S�e>���E���-G���{L�6p�e,8��������QI��h��a�Xa��U�A'���ʂ���s�+טIjP�-��y�8ۈZ?J$��W�P� ��R�s�]��|�l(�ԓ��sƊi��o(��S0��Y� 8�T97.�����WiL��c�~�dxc�E|�2!�X�K�Ƙਫ਼�$((�6�~|d9u+�qd�^3�89��Y�6L�.I�����?���iI�q���9�)O/뚅����O���X��X�V��ZF[�یgQ�L��K1���RҖr@v�#��X�l��F���Нy�S�8�7�kF!A��sM���^rkp�jP�DyS$N���q��nxҍ!U�f�!eh�i�2�m���`�Y�I�9r�6� �TF���C}/�y�^���Η���5d�'��9A-��J��>{�_l+�`��A���[�'��յ�ϛ#w:݅�%��X�}�&�PSt�Q�"�-��\縵�/����$Ɨh�Xb�*�y��BS����;W�ջ_mc�����vt?2}1�;qS�d�d~u:2k5�2�R�~�z+|HE!)�Ǟl��7`��0�<�,�2*���Hl-��x�^����'_TV�gZA�'j� ^�2Ϊ��N7t�����?w�� �x1��f��Iz�C-Ȗ��K�^q�;���-W�DvT�7��8�Z�������� hK�(P:��Q- �8�n�Z���܃e貾�<�1�YT<�,�����"�6{/ �?�͟��|1�:�#g��W�>$����d��J��d�B��=��jf[��%rE^��il:��B���x���Sּ�1հ��,�=��*�7 fcG��#q� �eh?��2�7�����,�!7x��6�n�LC�4x��},Geǝ�tC.��vS �F�43��zz\��;QYC,6����~;RYS/6���|2���5���v��T��i����������mlv��������&� �nRh^ejR�LG�f���? �ۉҬܦƩ��|��Ȱ����>3����!v��i�ʯ�>�v��オ�X3e���_1z�Kȗ\<������!�8���V��]��?b�k41�Re��T�q��mz��TiOʦ�Z��Xq���L������q"+���2ۨ��8}�&N7XU7Ap�d�X��~�׿��&4e�o�F��� �H����O���č�c�� 懴�6���͉��+)��v;j��ݷ�� �UV�� i��� j���Y9GdÒJ1��詞�����V?h��l����l�cGs�ځ�������y�Ac�����\V3�? �� ܙg�>qH�S,�E�W�[�㺨�uch�⍸�O�}���a��>�q�6�n6����N6�q������N ! 1AQaq�0@����"2BRb�#Pr���3C`��Scst���$4D���%Td�� ?���N����a��3��m���C���w��������xA�m�q�m���m������$����4n淿t'��C"w��zU=D�\R+w�p+Y�T�&�պ@��ƃ��3ޯ?�Aﶂ��aŘ���@-�����Q�=���9D��ռ�ѻ@��M�V��P��܅�G5�f�Y<�u=,EC)�<�Fy'�"�&�չ�X~f��l�KԆV��?�� �W�N����=(� �;���{�r����ٌ�Y���h{�١������jW����P���Tc�����X�K�r��}���w�R��%��?���E��m�� �Y�q|����\lEE4���r���}�lsI�Y������f�$�=�d�yO����p�����yBj8jU�o�/�S��?�U��*������ˍ�0������u�q�m [�?f����a�� )Q�>����6#������� ?����0UQ����,IX���(6ڵ[�DI�MNލ�c&���υ�j\��X�R|,4��� j������T�hA�e��^���d���b<����n�� �즇�=!���3�^�`j�h�ȓr��jẕ�c�,ٞX����-����a�ﶔ���#�$��]w�O��Ӫ�1y%��L�Y<�wg#�ǝ�̗`�x�xa�t�w��»1���o7o5��>�m뭛C���Uƃߜ}�C���y1Xνm�F8�jI���]����H���ۺиE@I�i;r�8ӭ����V�F�Շ| ��&?�3|x�B�MuS�Ge�=Ӕ�#BE5G�����Y!z��_e��q�р/W>|-�Ci߇�t�1ޯќd�R3�u��g�=0 5��[?�#͏��q�cf���H��{ ?u�=?�?ǯ���}Z��z���hmΔ�BFTW�����<�q�(v� ��!��z���iW]*�J�V�z��gX֧A�q�&��/w���u�gYӘa���; �i=����g:��?2�dž6�ى�k�4�>�Pxs����}������G�9��3 ���)gG�R<>r h�$��'nc�h�P��Bj��J�ҧH� -��N1���N��?��~��}-q!=��_2hc�M��l�vY%UE�@|�v����M2�.Y[|y�"Eï��K�ZF,�ɯ?,q�?v�M 80jx�"�;�9vk�����+ ֧�� �ȺU��?�%�vcV��mA�6��Qg^M����A}�3�nl� QRN�l8�kkn�'�����(��M�7m9و�q���%ޟ���*h$Zk"��$�9��: �?U8�Sl��,,|ɒ��xH(ѷ����Gn�/Q�4�P��G�%��Ա8�N��!� �&�7�;���eKM7�4��9R/%����l�c>�x;������>��C�:�����t��h?aKX�bhe�ᜋ^�$�Iհ �hr7%F$�E��Fd���t��5���+�(M6�t����Ü�UU|zW�=a�Ts�Tg������dqP�Q����b'�m���1{|Y����X�N��b �P~��F^F:����k6�"�j!�� �I�r�`��1&�-$�Bevk:y���#yw��I0��x��=D�4��tU���P�ZH��ڠ底taP��6����b>�xa����Q�#� WeF��ŮNj�p�J* mQ�N����*I�-*�ȩ�F�g�3 �5��V�ʊ�ɮ�a��5F���O@{���NX��?����H�]3��1�Ri_u��������ѕ�� ����0��� F��~��:60�p�͈�S��qX#a�5>���`�o&+�<2�D����: �������ڝ�$�nP���*)�N�|y�Ej�F�5ټ�e���ihy�Z �>���k�bH�a�v��h�-#���!�Po=@k̆IEN��@��}Ll?j�O������߭�ʞ���Q|A07x���wt!xf���I2?Z��<ץ�T���cU�j��]��陎Ltl �}5�ϓ��$�,��O�mˊ�;�@O��jE��j(�ا,��LX���LO���Ц�90�O �.����a��nA���7������j4 ��W��_ٓ���zW�jcB������y՗+EM�)d���N�g6�y1_x��p�$Lv:��9�"z��p���ʙ$��^��JԼ*�ϭ����o���=x�Lj�6�J��u82�A�H�3$�ٕ@�=Vv�]�'�qEz�;I˼��)��=��ɯ���x �/�W(V���p�����$ �m�������u�����񶤑Oqˎ�T����r��㠚x�sr�GC��byp�G��1ߠ�w e�8�$⿄����/�M{*}��W�]˷.�CK\�ުx���/$�WPw���r� |i���&�}�{�X� �>��$-��l���?-z���g����lΆ���(F���h�vS*���b���߲ڡn,|)mrH[���a�3�ר�[1��3o_�U�3�TC�$��(�=�)0�kgP���� ��u�^=��4 �WYCҸ:��vQ�ר�X�à��tk�m,�t*��^�,�}D*� �"(�I��9R����>`�`��[~Q]�#af��i6l��8���6�:,s�s�N6�j"�A4���IuQ��6E,�GnH��zS�HO�uk�5$�I�4��ؤ�Q9�@��C����wp�BGv[]�u�Ov���0I4���\��y�����Q�Ѹ��~>Z��8�T��a��q�ޣ;z��a���/��S��I:�ܫ_�|������>=Z����8:�S��U�I�J��"IY���8%b8���H��:�QO�6�;7�I�S��J��ҌAά3��>c���E+&jf$eC+�z�;��V����� �r���ʺ������my�e���aQ�f&��6�ND��.:��NT�vm�<- u���ǝ\MvZY�N�NT��-A�>jr!S��n�O 1�3�Ns�%�3D@���`������ܟ 1�^c<���� �a�ɽ�̲�Xë#�w�|y�cW�=�9I*H8�p�^(4���՗�k��arOcW�tO�\�ƍR��8����'�K���I�Q�����?5�>[�}��yU�ײ -h��=��% q�ThG�2�)���"ו3]�!kB��*p�FDl�A���,�eEi�H�f�Ps�����5�H:�Փ~�H�0Dت�D�I����h�F3�������c��2���E��9�H��5�zԑ�ʚ�i�X�=:m�xg�hd(�v����׊�9iS��O��d@0ڽ���:�p�5�h-��t�&���X�q�ӕ,��ie�|���7A�2���O%P��E��htj��Y1��w�Ѓ!����  ���� ࢽ��My�7�\�a�@�ţ�J �4�Ȼ�F�@o�̒?4�wx��)��]�P��~�����u�����5�����7X ��9��^ܩ�U;Iꭆ 5 �������eK2�7(�{|��Y׎ �V��\"���Z�1� Z�����}��(�Ǝ"�1S���_�vE30>���p;� ΝD��%x�W�?W?v����o�^V�i�d��r[��/&>�~`�9Wh��y�;���R��� ;;ɮT��?����r$�g1�K����A��C��c��K��l:�'��3 c�ﳯ*"t8�~l��)���m��+U,z��`(�>yJ�?����h>��]��v��ЍG*�{`��;y]��I�T� ;c��NU�fo¾h���/$���|NS���1�S�"�H��V���T���4��uhǜ�]�v;���5�͠x��'C\�SBpl���h}�N����� A�Bx���%��ޭ�l��/����T��w�ʽ]D�=����K���ž�r㻠l4�S�O?=�k �M:� ��c�C�a�#ha���)�ѐxc�s���gP�iG��{+���x���Q���I= �� z��ԫ+ �8"�k�ñ�j=|����c ��y��CF��/��*9ж�h{ �?4�o� ��k�m�Q�N�x��;�Y��4膚�a�w?�6�>e]�����Q�r�:����g�,i"�����ԩA�*M�<�G��b�if��l^M��5� �Ҩ�{����6J��ZJ�����P�*�����Y���ݛu�_4�9�I8�7���������,^ToR���m4�H��?�N�S�ѕw��/S��甍�@�9H�S�T��t�ƻ���ʒU��*{Xs�@����f�����֒Li�K{H�w^���������Ϥm�tq���s� ���ք��f:��o~s��g�r��ט� �S�ѱC�e]�x���a��) ���(b-$(�j>�7q�B?ӕ�F��hV25r[7 Y� }L�R��}����*sg+��x�r�2�U=�*'WS��ZDW]�WǞ�<��叓���{�$�9Ou4��y�90-�1�'*D`�c�^o?(�9��u���ݐ��'PI&� f�Jݮ�������:wS����jfP1F:X �H�9dԯ���˝[�_54 �}*;@�ܨ�� ð�yn�T���?�ןd�#���4rG�ͨ��H�1�|-#���Mr�S3��G�3�����)�.᧏3v�z֑��r����$G"�`j �1t��x0<Ɔ�Wh6�y�6��,œ�Ga��gA����y��b��)��h�D��ß�_�m��ü �gG;��e�v��ݝ�nQ� ��C����-�*��o���y�a��M��I�>�<���]obD��"�:���G�A��-\%LT�8���c�)��+y76���o�Q�#*{�(F�⽕�y����=���rW�\p���۩�c���A���^e6��K������ʐ�cVf5$�'->���ՉN"���F�"�UQ@�f��Gb~��#�&�M=��8�ט�JNu9��D��[̤�s�o�~������ G��9T�tW^g5y$b��Y'��س�Ǵ�=��U-2 #�MC�t(�i� �lj�@Q 5�̣i�*�O����s�x�K�f��}\��M{E�V�{�υ��Ƈ�����);�H����I��fe�Lȣr�2��>��W�I�Ȃ6������i��k�� �5�YOxȺ����>��Y�f5'��|��H+��98pj�n�.O�y�������jY��~��i�w'������l�;�s�2��Y��:'lg�ꥴ)o#'Sa�a�K��Z� �m��}�`169�n���"���x��I ��*+� }F<��cГ���F�P�������ֹ*�PqX�x۩��,� ��N�� �4<-����%����:��7����W���u�`����� $�?�I��&����o��o��`v�>��P��"��l���4��5'�Z�gE���8���?��[�X�7(��.Q�-��*���ތL@̲����v��.5���[��=�t\+�CNܛ��,g�SQnH����}*F�G16���&:�t��4ُ"A��̣��$�b �|����#rs��a�����T�� ]�<�j��BS�('$�ɻ� �wP;�/�n��?�ݜ��x�F��yUn�~mL*-�������Xf�wd^�a�}��f�,=t�׵i�.2/wpN�Ep8�OР���•��R�FJ� 55TZ��T �ɭ�<��]��/�0�r�@�f��V��V����Nz�G��^���7hZi����k��3�,kN�e|�vg�1{9]_i��X5y7� 8e]�U����'�-2,���e"����]ot�I��Y_��n�(JҼ��1�O ]bXc���Nu�No��pS���Q_���_�?i�~�x h5d'�(qw52] ��'ޤ�q��o1�R!���`ywy�A4u���h<קy���\[~�4�\ X�Wt/� 6�����n�F�a8��f���z �3$�t(���q��q�x��^�XWeN'p<-v�!�{�(>ӽDP7��ո0�y)�e$ٕv�Ih'Q�EA�m*�H��RI��=:��� ���4牢) �%_iN�ݧ�l]� �Nt���G��H�L��� ɱ�g<���1V�,�J~�ٹ�"K��Q�� 9�HS�9�?@��k����r�;we݁�]I�!{ �@�G�[�"��`���J:�n]�{�cA�E����V��ʆ���#��U9�6����j�#Y�m\��q�e4h�B�7��C�������d<�?J����1g:ٳ���=Y���D�p�ц� ׈ǔ��1�]26؜oS�'��9�V�FVu�P�h�9�xc�oq�X��p�o�5��Ա5$�9W�V(�[Ak�aY錎qf;�'�[�|���b�6�Ck��)��#a#a˙��8���=äh�4��2��C��4tm^ �n'c���]GQ$[Wҿ��i���vN�{Fu ��1�gx��1┷���N�m��{j-,��x�� Ūm�ЧS�[�s���Gna���䑴�� x�p 8<������97�Q���ϴ�v�aϚG��Rt�Һ׈�f^\r��WH�JU�7Z���y)�vg=����n��4�_)y��D'y�6�]�c�5̪�\� �PF�k����&�c;��cq�$~T�7j ���nç]�<�g ":�to�t}�159�<�/�8������m�b�K#g'I'.W�����6��I/��>v��\�MN��g���m�A�yQL�4u�Lj�j9��#44�t��l^�}L����n��R��!��t��±]��r��h6ٍ>�yҏ�N��fU�� ���� Fm@�8}�/u��jb9������he:A�y�ծw��GpΧh�5����l}�3p468��)U��d��c����;Us/�֔�YX�1�O2��uq�s��`hwg�r~�{ R��mhN��؎*q 42�*th��>�#���E����#��Hv�O����q�}�����6�e��\�,Wk�#���X��b>��p}�դ��3���T5��†��6��[��@�P�y*n��|'f�֧>�lư΂�̺����SU�'*�q�p�_S�����M�� '��c�6�����m�� ySʨ;M��r���Ƌ�m�Kxo,���Gm�P��A�G�:��i��w�9�}M(�^�V��$ǒ�ѽ�9���|���� �a����J�SQ�a���r�B;����}���ٻ֢�2�%U���c�#�g���N�a�ݕ�'�v�[�OY'��3L�3�;,p�]@�S��{ls��X�'���c�jw�k'a�.��}�}&�� �dP�*�bK=ɍ!����;3n�gΊU�ߴmt�'*{,=SzfD� A��ko~�G�aoq�_mi}#�m�������P�Xhύ����mxǍ�΂���巿zf��Q���c���|kc�����?���W��Y�$���_Lv����l߶��c���`?����l�j�ݲˏ!V��6����U�Ђ(A���4y)H���p�Z_�x��>���e��R��$�/�`^'3qˏ�-&Q�=?��CFVR �D�fV�9��{�8g�������n�h�(P"��6�[�D���< E�����~0<@�`�G�6����Hг�cc�� �c�K.5��D��d�B���`?�XQ��2��ٿyqo&+�1^� DW�0�ꊩ���G�#��Q�nL3��c���������/��x ��1�1[y�x�პCW��C�c�UĨ80�m�e�4.{�m��u���I=��f�����0QRls9���f���������9���~f�����Ǩ��a�"@�8���ȁ�Q����#c�ic������G��$���G���r/$W�(��W���V�"��m�7�[m�A�m����bo��D� j����۳� l���^�k�h׽����� ��#� iXn�v��eT�k�a�^Y�4�BN��ĕ��0 !01@Q"2AaPq3BR������?���@4�Q�����T3,���㺠�W�[=JK�Ϟ���2�r^7��vc�:�9 �E�ߴ�w�S#d���Ix��u��:��Hp��9E!�� V 2;73|F��9Y���*ʬ�F��D����u&���y؟��^EA��A��(ɩ���^��GV:ݜDy�`��Jr29ܾ�㝉��[���E;Fzx��YG��U�e�Y�C���� ����v-tx����I�sם�Ę�q��Eb�+P\ :>�i�C'�;�����k|z�رn�y]�#ǿb��Q��������w�����(�r|ӹs��[�D��2v-%��@;�8<a���[\o[ϧw��I!��*0�krs)�[�J9^��ʜ��p1)� "��/_>��o��<1����A�E�y^�C��`�x1'ܣn�p��s`l���fQ��):�l����b>�Me�jH^?�kl3(�z:���1ŠK&?Q�~�{�ٺ�h�y���/�[��V�|6��}�KbX����mn[-��7�5q�94�������dm���c^���h� X��5��<�eޘ>G���-�}�دB�ޟ� ��|�rt�M��V+�]�c?�-#ڛ��^ǂ}���Lkr���O��u�>�-D�ry� D?:ޞ�U��ǜ�7�V��?瓮�"�#���r��չģVR;�n���/_� ؉v�ݶe5d�b9��/O��009�G���5n�W����JpA�*�r9�>�1��.[t���s�F���nQ� V 77R�]�ɫ8����_0<՜�IF�u(v��4��F�k�3��E)��N:��yڮe��P�`�1}�$WS��J�SQ�N�j�ٺ��޵�#l���ј(�5=��5�lǏmoW�v-�1����v,W�mn��߀$x�<����v�j(����c]��@#��1������Ǔ���o'��u+����;G�#�޸��v-lη��/(`i⣍Pm^���ԯ̾9Z��F��������n��1��� ��]�[��)�'������:�֪�W��FC����� �B9،!?���]��V��A�Վ�M��b�w��G F>_DȬ0¤�#�QR�[V��kz���m�w�"��9ZG�7'[��=�Q����j8R?�zf�\a�=��O�U����*oB�A�|G���2�54 �p��.w7� �� ��&������ξxGHp� B%��$g�����t�Џ򤵍z���HN�u�Я�-�'4��0��;_��3 !01"@AQa2Pq#3BR������?��ʩca��en��^��8���<�u#��m*08r��y�N"�<�Ѳ0��@\�p��� �����Kv�D��J8�Fҽ� �f�Y��-m�ybX�NP����}�!*8t(�OqѢ��Q�wW�K��ZD��Δ^e��!� ��B�K��p~�����e*l}z#9ң�k���q#�Ft�o��S�R����-�w�!�S���Ӥß|M�l޶V��!eˈ�8Y���c�ЮM2��tk���� ������J�fS����Ö*i/2�����n]�k�\���|4yX�8��U�P.���Ы[���l��@"�t�<������5�lF���vU�����W��W��;�b�cД^6[#7@vU�xgZv��F�6��Q,K�v��� �+Ъ��n��Ǣ��Ft���8��0��c�@�!�Zq s�v�t�;#](B��-�nῃ~���3g������5�J�%���O������n�kB�ĺ�.r��+���#�N$?�q�/�s�6��p��a����a��J/��M�8��6�ܰ"�*������ɗud"\w���aT(����[��F��U՛����RT�b���n�*��6���O��SJ�.�ij<�v�MT��R\c��5l�sZB>F��<7�;EA��{��E���Ö��1U/�#��d1�a�n.1ě����0�ʾR�h��|�R��Ao�3�m3 ��%�� ���28Q� ��y��φ���H�To�7�lW>����#i`�q���c����a��� �m,B�-j����݋�'mR1Ήt�>��V��p���s�0IbI�C.���1R�ea�����]H�6����������4B>��o��](��$B���m�����a�!=��?�B� K�Ǿ+�Ծ"�n���K��*��+��[T#�{E�J�S����Q�����s�5�:�U�\wĐ�f�3����܆&�)����I���Ԇw��E T�lrTf6Q|R�h:��[K�� �z��c֧�G�C��%\��_�a�84��HcO�bi��ؖV��7H �)*ģK~Xhչ0��4?�0��� �E<���}3���#���u�?�� ��|g�S�6ꊤ�|�I#Hڛ� �ա��w�X��9��7���Ŀ%�SL��y6č��|�F�a 8���b��$�sק�h���b9RAu7�˨p�Č�_\*w��묦��F ����4D~�f����|(�"m���NK��i�S�>�$d7SlA��/�²����SL��|6N�}���S�˯���g��]6��; �#�.��<���q'Q�1|KQ$�����񛩶"�$r�b:���N8�w@��8$�� �AjfG|~�9F ���Y��ʺ��Bwؒ������M:I岎�G��`s�YV5����6��A �b:�W���G�q%l�����F��H���7�������Fsv7��k�� 403WebShell
403Webshell
Server IP : 82.112.239.65  /  Your IP : 216.73.217.4
Web Server : LiteSpeed
System : Linux in-mum-web1675.main-hosting.eu 5.14.0-503.38.1.el9_5.x86_64 #1 SMP PREEMPT_DYNAMIC Fri Apr 18 08:52:10 EDT 2025 x86_64
User : u700808869 ( 700808869)
PHP Version : 8.0.30
Disable Function : system, exec, shell_exec, passthru, mysql_list_dbs, ini_alter, dl, symlink, link, chgrp, leak, popen, apache_child_terminate, virtual, mb_send_mail
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : OFF  |  Python : OFF  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /proc/thread-self/root/opt/gsutil/gslib/tests/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /proc/thread-self/root/opt/gsutil/gslib/tests/test_iam.py
# -*- coding: utf-8 -*-
# Copyright 2016 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Integration tests for the iam command."""

from __future__ import absolute_import
from __future__ import print_function
from __future__ import division
from __future__ import unicode_literals

from collections import defaultdict
import json
import os
import subprocess

from gslib.commands import iam
from gslib.exception import CommandException
from gslib.project_id import PopulateProjectId
import gslib.tests.testcase as testcase
from gslib.tests.testcase.integration_testcase import SkipForS3
from gslib.tests.testcase.integration_testcase import SkipForXML
from gslib.tests.util import GenerationFromURI as urigen
from gslib.tests.util import SetBotoConfigForTest
from gslib.tests.util import SetEnvironmentForTest
from gslib.tests.util import unittest
from gslib.third_party.storage_apitools import storage_v1_messages as apitools_messages
from gslib.utils import shim_util
from gslib.utils.constants import UTF8
from gslib.utils.iam_helper import BindingsMessageToUpdateDict
from gslib.utils.iam_helper import BindingsDictToUpdateDict
from gslib.utils.iam_helper import BindingStringToTuple as bstt
from gslib.utils.iam_helper import DiffBindings
from gslib.utils.iam_helper import IsEqualBindings
from gslib.utils.iam_helper import PatchBindings
from gslib.utils.retry_util import Retry
from six import add_move, MovedModule

add_move(MovedModule('mock', 'mock', 'unittest.mock'))
from six.moves import mock

bvle = apitools_messages.Policy.BindingsValueListEntry

# Feature iam_bucket_roles must be turned on in bigstore dev config for setting
# the new IAM policies on buckets.
IAM_BUCKET_READ_ROLE_ABBREV = 'legacyBucketReader'
IAM_BUCKET_READ_ROLE = 'roles/storage.%s' % IAM_BUCKET_READ_ROLE_ABBREV
# GCS IAM does not currently support new object-level roles.
IAM_OBJECT_READ_ROLE = 'roles/storage.legacyObjectReader'
IAM_OBJECT_VIEWER_ROLE = 'roles/storage.objectViewer'

TEST_CONDITION_DESCRIPTION = 'Description for our test condition.'
TEST_CONDITION_EXPR_RESOURCE_IS_OBJECT = (
    'resource.type == "google.cloud.storage.Object"')
TEST_CONDITION_TITLE = 'Test Condition Title'


def gen_binding(role, members=None, condition=None):
  """Generate the "bindings" portion of an IAM Policy dictionary.

  Generates list of dicts which each represent a
  storage_v1_messages.Policy.BindingsValueListEntry object. The list will
  contain a single dict which has attributes corresponding to arguments passed
  to this method.

  Args:
    role: (str) An IAM policy role (e.g. "roles/storage.objectViewer"). Fully
        specified in BindingsValueListEntry.
    members: (List[str]) A list of members (e.g. ["user:foo@bar.com"]). If None,
        bind to ["allUsers"]. Fully specified in BindingsValueListEntry.
    condition: (Dict) A dictionary representing the JSON used to define a
        binding condition, containing the keys "description", "expression", and
        "title".

  Returns:
    (List[Dict[str, Any]]) A Python representation of the "bindings" portion of
    an IAM Policy.
  """
  binding = {
      'members': ['allUsers'] if members is None else members,
      'role': role,
  }
  if condition:
    binding['condition'] = condition
  return [binding]


def patch_binding(policy, role, new_policy):
  """Returns a patched Python object representation of a Policy.

  Given replaces the original role:members binding in policy with new_policy.

  Args:
    policy: Python dict representation of a Policy instance.
    role: An IAM policy role (e.g. "roles/storage.objectViewer"). Fully
          specified in BindingsValueListEntry.
    new_policy: A Python dict representation of a Policy instance, with a
                single BindingsValueListEntry entry.

  Returns:
    A Python dict representation of the patched IAM Policy object.
  """
  bindings = [
      b for b in policy.get('bindings', []) if b.get('role', '') != role
  ]
  bindings.extend(new_policy)
  policy = dict(policy)
  policy['bindings'] = bindings
  return policy


class TestIamIntegration(testcase.GsUtilIntegrationTestCase):
  """Superclass for iam integration test cases."""

  def assertEqualsPoliciesString(self, a, b):
    """Asserts two serialized policy bindings are equal."""
    expected = [
        bvle(members=binding_dict['members'], role=binding_dict['role'])
        for binding_dict in json.loads(a)['bindings']
    ]
    result = [
        bvle(members=binding_dict['members'], role=binding_dict['role'])
        for binding_dict in json.loads(b)['bindings']
    ]
    self.assertTrue(IsEqualBindings(expected, result))


@SkipForS3('Tests use GS IAM model.')
@SkipForXML('XML IAM control is not supported.')
class TestIamHelpers(testcase.GsUtilUnitTestCase):
  """Unit tests for iam command helper."""

  def test_convert_bindings_simple(self):
    """Tests that Policy.bindings lists are converted to dicts properly."""
    self.assertEqual(BindingsMessageToUpdateDict([]), defaultdict(set))
    expected = defaultdict(set, {'x': set(['y'])})
    self.assertEqual(
        BindingsMessageToUpdateDict([bvle(role='x', members=['y'])]), expected)

  def test_convert_bindings_duplicates(self):
    """Test that role and member duplication are converted correctly."""
    expected = defaultdict(set, {'x': set(['y', 'z'])})
    duplicate_roles = [
        bvle(role='x', members=['y']),
        bvle(role='x', members=['z'])
    ]
    duplicate_members = [
        bvle(role='x', members=['z', 'y']),
        bvle(role='x', members=['z'])
    ]
    self.assertEqual(BindingsMessageToUpdateDict(duplicate_roles), expected)
    self.assertEqual(BindingsMessageToUpdateDict(duplicate_members), expected)

  def test_convert_bindings_dict_simple(self):
    """Tests that Policy.bindings lists are converted to dicts properly."""
    self.assertEqual(BindingsDictToUpdateDict([]), defaultdict(set))
    expected = defaultdict(set, {'x': set(['y'])})
    self.assertEqual(
        BindingsDictToUpdateDict([{
            'role': 'x',
            'members': ['y']
        }]), expected)

  def test_convert_bindings_dict_duplicates(self):
    """Test that role and member duplication are converted correctly."""
    expected = defaultdict(set, {'x': set(['y', 'z'])})
    duplicate_roles = [{
        'role': 'x',
        'members': ['y']
    }, {
        'role': 'x',
        'members': ['z']
    }]
    duplicate_members = [{
        'role': 'x',
        'members': ['z', 'y']
    }, {
        'role': 'x',
        'members': ['z']
    }]
    self.assertEqual(BindingsDictToUpdateDict(duplicate_roles), expected)
    self.assertEqual(BindingsDictToUpdateDict(duplicate_members), expected)

  def test_equality_bindings_literal(self):
    """Tests an easy case of identical bindings."""
    bindings = [bvle(role='x', members=['y'])]
    self.assertTrue(IsEqualBindings([], []))
    self.assertTrue(IsEqualBindings(bindings, bindings))

  def test_equality_bindings_extra_roles(self):
    """Tests bindings equality when duplicate roles are added."""
    bindings = [bvle(role='x', members=['x', 'y'])]
    bindings2 = bindings * 2
    bindings3 = [
        bvle(role='x', members=['y']),
        bvle(role='x', members=['x']),
    ]
    self.assertTrue(IsEqualBindings(bindings, bindings2))
    self.assertTrue(IsEqualBindings(bindings, bindings3))

  def test_diff_bindings_add_role(self):
    """Tests simple grant behavior of Policy.bindings diff."""
    expected = [bvle(role='x', members=['y'])]
    (granted, removed) = DiffBindings([], expected)
    self.assertEqual(granted.bindings, expected)
    self.assertEqual(removed.bindings, [])

  def test_diff_bindings_drop_role(self):
    """Tests simple remove behavior of Policy.bindings diff."""
    expected = [bvle(role='x', members=['y'])]
    (granted, removed) = DiffBindings(expected, [])
    self.assertEqual(granted.bindings, [])
    self.assertEqual(removed.bindings, expected)

  def test_diff_bindings_swap_role(self):
    """Tests expected behavior of switching a role."""
    old = [bvle(role='x', members=['y'])]
    new = [bvle(role='a', members=['b'])]
    (granted, removed) = DiffBindings(old, new)
    self.assertEqual(granted.bindings, new)
    self.assertEqual(removed.bindings, old)

  def test_diff_bindings_add_member(self):
    """Tests expected behavior of adding a member to a role."""
    old = [bvle(role='x', members=['y'])]
    new = [bvle(role='x', members=['z', 'y'])]
    expected = [bvle(role='x', members=['z'])]
    (granted, removed) = DiffBindings(old, new)
    self.assertEqual(granted.bindings, expected)
    self.assertEqual(removed.bindings, [])

  def test_diff_bindings_drop_member(self):
    """Tests expected behavior of dropping a member from a role."""
    old = [bvle(role='x', members=['z', 'y'])]
    new = [bvle(role='x', members=['y'])]
    expected = [bvle(role='x', members=['z'])]
    (granted, removed) = DiffBindings(old, new)
    self.assertEqual(granted.bindings, [])
    self.assertEqual(removed.bindings, expected)

  def test_diff_bindings_swap_member(self):
    """Tests expected behavior of switching a member in a role."""
    old = [bvle(role='x', members=['z'])]
    new = [bvle(role='x', members=['y'])]
    (granted, removed) = DiffBindings(old, new)
    self.assertEqual(granted.bindings, new)
    self.assertEqual(removed.bindings, old)

  def test_patch_bindings_grant(self):
    """Tests patching a grant binding."""
    base_list = [
        bvle(role='a', members=['user:foo@bar.com']),
        bvle(role='b', members=['user:foo@bar.com']),
        bvle(role='c', members=['user:foo@bar.com']),
    ]
    base = BindingsMessageToUpdateDict(base_list)
    diff_list = [
        bvle(role='d', members=['user:foo@bar.com']),
    ]
    diff = BindingsMessageToUpdateDict(diff_list)
    expected = BindingsMessageToUpdateDict(base_list + diff_list)
    res = PatchBindings(base, diff, True)
    self.assertEqual(res, expected)

  def test_patch_bindings_remove(self):
    """Tests patching a remove binding."""
    base = BindingsMessageToUpdateDict([
        bvle(members=['user:foo@bar.com'], role='a'),
        bvle(members=['user:foo@bar.com'], role='b'),
        bvle(members=['user:foo@bar.com'], role='c'),
    ])
    diff = BindingsMessageToUpdateDict([
        bvle(members=['user:foo@bar.com'], role='a'),
    ])
    expected = BindingsMessageToUpdateDict([
        bvle(members=['user:foo@bar.com'], role='b'),
        bvle(members=['user:foo@bar.com'], role='c'),
    ])

    res = PatchBindings(base, diff, False)
    self.assertEqual(res, expected)

  def test_patch_bindings_remove_all(self):
    """Tests removing all roles from a member."""
    base = BindingsMessageToUpdateDict([
        bvle(members=['user:foo@bar.com'], role='a'),
        bvle(members=['user:foo@bar.com'], role='b'),
        bvle(members=['user:foo@bar.com'], role='c'),
    ])
    diff = BindingsMessageToUpdateDict([
        bvle(members=['user:foo@bar.com'], role=''),
    ])
    res = PatchBindings(base, diff, False)
    self.assertEqual(res, {})

    diff = BindingsMessageToUpdateDict([
        bvle(members=['user:foo@bar.com'], role='a'),
        bvle(members=['user:foo@bar.com'], role='b'),
        bvle(members=['user:foo@bar.com'], role='c'),
    ])

    res = PatchBindings(base, diff, False)
    self.assertEqual(res, {})

  def test_patch_bindings_multiple_users(self):
    """Tests expected behavior when multiple users exist."""
    expected = BindingsMessageToUpdateDict([
        bvle(members=['user:fii@bar.com'], role='b'),
    ])
    base = BindingsMessageToUpdateDict([
        bvle(members=['user:foo@bar.com'], role='a'),
        bvle(members=['user:foo@bar.com', 'user:fii@bar.com'], role='b'),
        bvle(members=['user:foo@bar.com'], role='c'),
    ])
    diff = BindingsMessageToUpdateDict([
        bvle(members=['user:foo@bar.com'], role='a'),
        bvle(members=['user:foo@bar.com'], role='b'),
        bvle(members=['user:foo@bar.com'], role='c'),
    ])
    res = PatchBindings(base, diff, False)
    self.assertEqual(res, expected)

  def test_patch_bindings_grant_all_users(self):
    """Tests a public member grant."""
    base = BindingsMessageToUpdateDict([
        bvle(role='a', members=['user:foo@bar.com']),
        bvle(role='b', members=['user:foo@bar.com']),
        bvle(role='c', members=['user:foo@bar.com']),
    ])
    diff = BindingsMessageToUpdateDict([
        bvle(role='a', members=['allUsers']),
    ])
    expected = BindingsMessageToUpdateDict([
        bvle(role='a', members=['allUsers', 'user:foo@bar.com']),
        bvle(role='b', members=['user:foo@bar.com']),
        bvle(role='c', members=['user:foo@bar.com']),
    ])

    res = PatchBindings(base, diff, True)
    self.assertEqual(res, expected)

  def test_patch_bindings_public_member_overwrite(self):
    """Tests public member vs. public member interaction."""
    base_list = [
        bvle(role='a', members=['allUsers']),
    ]
    base = BindingsMessageToUpdateDict(base_list)
    diff_list = [
        bvle(role='a', members=['allAuthenticatedUsers']),
    ]
    diff = BindingsMessageToUpdateDict(diff_list)

    res = PatchBindings(base, diff, True)
    self.assertEqual(res, BindingsMessageToUpdateDict(base_list + diff_list))

  def test_valid_public_member_single_role(self):
    """Tests parsing single role (case insensitive)."""
    (_, bindings) = bstt(True, 'allusers:admin')
    self.assertEqual(len(bindings), 1)
    self.assertIn({
        'members': ['allUsers'],
        'role': 'roles/storage.admin'
    }, bindings)

  def test_grant_no_role_error(self):
    """Tests that an error is raised when no role is specified for a grant."""
    with self.assertRaises(CommandException):
      bstt(True, 'allUsers')
    with self.assertRaises(CommandException):
      bstt(True, 'user:foo@bar.com')
    with self.assertRaises(CommandException):
      bstt(True, 'user:foo@bar.com:')
    with self.assertRaises(CommandException):
      bstt(True, 'deleted:user:foo@bar.com?uid=1234:')

  def test_remove_all_roles(self):
    """Tests parsing a -d allUsers or -d user:foo@bar.com request."""
    # Input specifies remove all roles from allUsers.
    (is_grant, bindings) = bstt(False, 'allUsers')
    self.assertEqual(len(bindings), 1)
    self.assertIn({'members': ['allUsers'], 'role': ''}, bindings)
    self.assertEqual((is_grant, bindings), bstt(False, 'allUsers:'))

    # Input specifies remove all roles from a user.
    (_, bindings) = bstt(False, 'user:foo@bar.com')
    self.assertEqual(len(bindings), 1)

  def test_valid_multiple_roles(self):
    """Tests parsing of multiple roles bound to one user."""
    (_, bindings) = bstt(True, 'allUsers:a,b,c,roles/custom')
    self.assertEqual(len(bindings), 4)
    self.assertIn({
        'members': ['allUsers'],
        'role': 'roles/storage.a'
    }, bindings)
    self.assertIn({
        'members': ['allUsers'],
        'role': 'roles/storage.b'
    }, bindings)
    self.assertIn({
        'members': ['allUsers'],
        'role': 'roles/storage.c'
    }, bindings)
    self.assertIn({'members': ['allUsers'], 'role': 'roles/custom'}, bindings)

  def test_valid_custom_roles(self):
    """Tests parsing of custom roles bound to one user."""
    (_, bindings) = bstt(True, 'user:foo@bar.com:roles/custom1,roles/custom2')
    self.assertEqual(len(bindings), 2)
    self.assertIn({
        'members': ['user:foo@bar.com'],
        'role': 'roles/custom1'
    }, bindings)
    self.assertIn({
        'members': ['user:foo@bar.com'],
        'role': 'roles/custom2'
    }, bindings)

  def test_valid_member(self):
    """Tests member parsing (case insensitive)."""
    (_, bindings) = bstt(True, 'User:foo@bar.com:admin')
    self.assertEqual(len(bindings), 1)
    self.assertIn(
        {
            'members': ['user:foo@bar.com'],
            'role': 'roles/storage.admin'
        }, bindings)

  def test_valid_deleted_member(self):
    """Tests deleted member parsing (case insensitive)."""
    (_, bindings) = bstt(False, 'Deleted:User:foo@bar.com?uid=123')
    self.assertEqual(len(bindings), 1)
    self.assertIn({
        'members': ['deleted:user:foo@bar.com?uid=123'],
        'role': ''
    }, bindings)
    (_, bindings) = bstt(True, 'deleted:User:foo@bar.com?uid=123:admin')
    self.assertEqual(len(bindings), 1)
    self.assertIn(
        {
            'members': ['deleted:user:foo@bar.com?uid=123'],
            'role': 'roles/storage.admin'
        }, bindings)
    # These emails can actually have multiple query params
    (_, bindings) = bstt(
        True,
        'deleted:user:foo@bar.com?query=param,uid=123?uid=456:admin,admin2')
    self.assertEqual(len(bindings), 2)
    self.assertIn(
        {
            'members': ['deleted:user:foo@bar.com?query=param,uid=123?uid=456'],
            'role': 'roles/storage.admin'
        }, bindings)
    self.assertIn(
        {
            'members': ['deleted:user:foo@bar.com?query=param,uid=123?uid=456'],
            'role': 'roles/storage.admin2'
        }, bindings)

  def test_duplicate_roles(self):
    """Tests that duplicate roles are ignored."""
    (_, bindings) = bstt(True, 'allUsers:a,a')
    self.assertEqual(len(bindings), 1)
    self.assertIn({
        'members': ['allUsers'],
        'role': 'roles/storage.a'
    }, bindings)

  def test_removing_project_convenience_groups(self):
    """Tests that project convenience roles can be removed."""
    (_, bindings) = bstt(False, 'projectViewer:123424:admin')
    self.assertEqual(len(bindings), 1)
    self.assertIn(
        {
            'members': ['projectViewer:123424'],
            'role': 'roles/storage.admin'
        }, bindings)
    (_, bindings) = bstt(False, 'projectViewer:123424')
    self.assertEqual(len(bindings), 1)
    self.assertIn({'members': ['projectViewer:123424'], 'role': ''}, bindings)

  def test_adding_project_convenience_groups(self):
    """Tests that project convenience roles cannot be added."""
    with self.assertRaises(CommandException):
      bstt(True, 'projectViewer:123424:admin')

  def test_invalid_input(self):
    """Tests invalid input handling."""
    with self.assertRaises(CommandException):
      bstt(True, 'non_valid_public_member:role')
    with self.assertRaises(CommandException):
      bstt(True, 'non_valid_type:id:role')
    with self.assertRaises(CommandException):
      bstt(True, 'user:r')
    with self.assertRaises(CommandException):
      bstt(True, 'deleted:user')
    with self.assertRaises(CommandException):
      bstt(True, 'deleted:not_a_type')
    with self.assertRaises(CommandException):
      bstt(True, 'deleted:user:foo@no_uid_suffix')

  def test_invalid_n_args(self):
    """Tests invalid input due to too many colons."""
    with self.assertRaises(CommandException):
      bstt(True, 'allUsers:some_id:some_role')
    with self.assertRaises(CommandException):
      bstt(True, 'user:foo@bar.com:r:nonsense')
    with self.assertRaises(CommandException):
      bstt(True, 'deleted:user:foo@bar.com?uid=1234:r:nonsense')


@SkipForS3('Tests use GS IAM model.')
@SkipForXML('XML IAM control is not supported.')
class TestIamCh(TestIamIntegration):
  """Integration tests for iam ch command."""

  def setUp(self):
    super(TestIamCh, self).setUp()
    self.bucket = self.CreateBucket()
    self.bucket2 = self.CreateBucket()
    self.object = self.CreateObject(bucket_uri=self.bucket, contents=b'foo')
    self.object2 = self.CreateObject(bucket_uri=self.bucket, contents=b'bar')

    self.bucket_iam_string = self.RunGsUtil(['iam', 'get', self.bucket.uri],
                                            return_stdout=True)
    self.object_iam_string = self.RunGsUtil(['iam', 'get', self.object.uri],
                                            return_stdout=True)
    self.object2_iam_string = self.RunGsUtil(['iam', 'get', self.object2.uri],
                                             return_stdout=True)

    self.user = 'user:foo@bar.com'
    self.user2 = 'user:bar@foo.com'

  def test_patch_no_role(self):
    """Tests expected failure if no bindings are listed."""
    stderr = self.RunGsUtil(['iam', 'ch', self.bucket.uri],
                            return_stderr=True,
                            expected_status=1)
    self.assertIn('CommandException', stderr)

  def test_raises_error_message_for_d_flag_missing_argument(self):
    """Tests expected failure if no bindings are listed."""
    stderr = self.RunGsUtil(
        ['iam', 'ch',
         '%s:%s' % (self.user, IAM_BUCKET_READ_ROLE_ABBREV), '-d'],
        return_stderr=True,
        expected_status=1)
    self.assertIn(
        'A -d flag is missing an argument specifying bindings to remove.',
        stderr)

  def test_path_mix_of_buckets_and_objects(self):
    """Tests expected failure if both buckets and objects are provided."""
    stderr = self.RunGsUtil([
        'iam', 'ch',
        '%s:%s' % (self.user, IAM_BUCKET_READ_ROLE_ABBREV), self.bucket.uri,
        self.object.uri
    ],
                            return_stderr=True,
                            expected_status=1)
    self.assertIn('CommandException', stderr)

  def test_path_file_url(self):
    """Tests expected failure is caught when a file url is provided."""
    stderr = self.RunGsUtil([
        'iam', 'ch',
        '%s:%s' % (self.user, IAM_BUCKET_READ_ROLE_ABBREV), 'file://somefile'
    ],
                            return_stderr=True,
                            expected_status=1)
    self.assertIn('AttributeError', stderr)

  def test_patch_single_grant_single_bucket(self):
    """Tests granting single role."""
    self.assertHasNo(self.bucket_iam_string, self.user, IAM_BUCKET_READ_ROLE)
    self.RunGsUtil([
        'iam', 'ch',
        '%s:%s' % (self.user, IAM_BUCKET_READ_ROLE_ABBREV), self.bucket.uri
    ])

    bucket_iam_string = self.RunGsUtil(['iam', 'get', self.bucket.uri],
                                       return_stdout=True)
    self.assertHas(bucket_iam_string, self.user, IAM_BUCKET_READ_ROLE)

  def test_patch_repeated_grant(self):
    """Granting multiple times for the same member will have no effect."""
    self.RunGsUtil([
        'iam', 'ch',
        '%s:%s' % (self.user, IAM_BUCKET_READ_ROLE_ABBREV), self.bucket.uri
    ])
    self.RunGsUtil([
        'iam', 'ch',
        '%s:%s' % (self.user, IAM_BUCKET_READ_ROLE_ABBREV), self.bucket.uri
    ])

    bucket_iam_string = self.RunGsUtil(['iam', 'get', self.bucket.uri],
                                       return_stdout=True)
    self.assertHas(bucket_iam_string, self.user, IAM_BUCKET_READ_ROLE)

  def test_patch_single_remove_single_bucket(self):
    """Tests removing a single role."""
    self.RunGsUtil([
        'iam', 'ch',
        '%s:%s' % (self.user, IAM_BUCKET_READ_ROLE_ABBREV), self.bucket.uri
    ])
    self.RunGsUtil([
        'iam', 'ch', '-d',
        '%s:%s' % (self.user, IAM_BUCKET_READ_ROLE_ABBREV), self.bucket.uri
    ])

    bucket_iam_string = self.RunGsUtil(['iam', 'get', self.bucket.uri],
                                       return_stdout=True)
    self.assertHasNo(bucket_iam_string, self.user, IAM_BUCKET_READ_ROLE)

  def test_patch_null_remove(self):
    """Removing a non-existent binding will have no effect."""
    self.RunGsUtil([
        'iam', 'ch', '-d',
        '%s:%s' % (self.user, IAM_BUCKET_READ_ROLE_ABBREV), self.bucket.uri
    ])

    bucket_iam_string = self.RunGsUtil(['iam', 'get', self.bucket.uri],
                                       return_stdout=True)
    self.assertHasNo(bucket_iam_string, self.user, IAM_BUCKET_READ_ROLE)
    self.assertEqualsPoliciesString(bucket_iam_string, self.bucket_iam_string)

  def test_patch_mixed_grant_remove_single_bucket(self):
    """Tests that mixing grant and remove requests will succeed."""
    self.RunGsUtil([
        'iam', 'ch',
        '%s:%s' % (self.user2, IAM_BUCKET_READ_ROLE_ABBREV), self.bucket.uri
    ])
    self.RunGsUtil([
        'iam', 'ch',
        '%s:%s' % (self.user, IAM_BUCKET_READ_ROLE_ABBREV), '-d',
        '%s:%s' % (self.user2, IAM_BUCKET_READ_ROLE_ABBREV), self.bucket.uri
    ])

    bucket_iam_string = self.RunGsUtil(['iam', 'get', self.bucket.uri],
                                       return_stdout=True)
    self.assertHas(bucket_iam_string, self.user, IAM_BUCKET_READ_ROLE)
    self.assertHasNo(bucket_iam_string, self.user2, IAM_BUCKET_READ_ROLE)

  def test_patch_public_grant_single_bucket(self):
    """Test public grant request interacts properly with existing members."""
    self.RunGsUtil([
        'iam', 'ch',
        '%s:%s' % (self.user, IAM_BUCKET_READ_ROLE_ABBREV), self.bucket.uri
    ])
    self.RunGsUtil([
        'iam', 'ch',
        'allUsers:%s' % IAM_BUCKET_READ_ROLE_ABBREV, self.bucket.uri
    ])

    bucket_iam_string = self.RunGsUtil(['iam', 'get', self.bucket.uri],
                                       return_stdout=True)
    self.assertHas(bucket_iam_string, 'allUsers', IAM_BUCKET_READ_ROLE)
    self.assertHas(bucket_iam_string, self.user, IAM_BUCKET_READ_ROLE)

  def test_patch_remove_all_roles(self):
    """Remove with no roles specified will remove member from all bindings."""
    self.RunGsUtil([
        'iam', 'ch',
        '%s:%s' % (self.user, IAM_BUCKET_READ_ROLE_ABBREV), self.bucket.uri
    ])
    self.RunGsUtil(['iam', 'ch', '-d', self.user, self.bucket.uri])

    bucket_iam_string = self.RunGsUtil(['iam', 'get', self.bucket.uri],
                                       return_stdout=True)
    self.assertHasNo(bucket_iam_string, self.user, IAM_BUCKET_READ_ROLE)

  def test_patch_single_object(self):
    """Tests object IAM patch behavior."""
    self.assertHasNo(self.object_iam_string, self.user, IAM_OBJECT_READ_ROLE)
    self.RunGsUtil(
        ['iam', 'ch',
         '%s:legacyObjectReader' % self.user, self.object.uri])

    object_iam_string = self.RunGsUtil(['iam', 'get', self.object.uri],
                                       return_stdout=True)
    self.assertHas(object_iam_string, self.user, IAM_OBJECT_READ_ROLE)

  def test_patch_multithreaded_single_object(self):
    """Tests the edge-case behavior of multithreaded execution."""
    self.assertHasNo(self.object_iam_string, self.user, IAM_OBJECT_READ_ROLE)
    self.RunGsUtil([
        '-m', 'iam', 'ch',
        '%s:legacyObjectReader' % self.user, self.object.uri
    ])

    object_iam_string = self.RunGsUtil(['iam', 'get', self.object.uri],
                                       return_stdout=True)
    self.assertHas(object_iam_string, self.user, IAM_OBJECT_READ_ROLE)

  def test_patch_invalid_input(self):
    """Tests that listing bindings after a bucket will throw an error."""
    stderr = self.RunGsUtil([
        'iam', 'ch',
        '%s:%s' % (self.user, IAM_BUCKET_READ_ROLE_ABBREV), self.bucket.uri,
        '%s:%s' % (self.user2, IAM_BUCKET_READ_ROLE_ABBREV)
    ],
                            return_stderr=True,
                            expected_status=1)
    self.assertIn('CommandException', stderr)

    bucket_iam_string = self.RunGsUtil(['iam', 'get', self.bucket.uri],
                                       return_stdout=True)
    self.assertHas(bucket_iam_string, self.user, IAM_BUCKET_READ_ROLE)
    self.assertHasNo(bucket_iam_string, self.user2, IAM_BUCKET_READ_ROLE)

  def test_patch_disallowed_binding_type(self):
    """Tests that we disallow certain binding types with appropriate err."""
    stderr = self.RunGsUtil(
        ['iam', 'ch', 'projectOwner:my-project:admin', self.bucket.uri],
        return_stderr=True,
        expected_status=1)
    self.assertIn('not supported', stderr)

  def test_patch_remove_disallowed_binding_type(self):
    """Tests that we can remove project convenience values."""
    # Set up the bucket to include a disallowed member which we can then remove.
    disallowed_member = 'projectViewer:%s' % PopulateProjectId()
    policy_file_path = self.CreateTempFile(contents=json.dumps(
        patch_binding(
            json.loads(self.bucket_iam_string), IAM_OBJECT_READ_ROLE,
            gen_binding(IAM_OBJECT_READ_ROLE, members=[disallowed_member
                                                      ]))).encode(UTF8))
    self.RunGsUtil(['iam', 'set', policy_file_path, self.bucket.uri])
    # Confirm the disallowed member was actually added.
    iam_string = self.RunGsUtil(['iam', 'get', self.bucket.uri],
                                return_stdout=True)
    self.assertHas(iam_string, disallowed_member, IAM_OBJECT_READ_ROLE)
    # Use iam ch to remove the disallowed member.
    self.RunGsUtil(['iam', 'ch', '-d', disallowed_member, self.bucket.uri])
    # Confirm the disallowed member was actually removed.
    iam_string = self.RunGsUtil(['iam', 'get', self.bucket.uri],
                                return_stdout=True)
    self.assertHasNo(iam_string, disallowed_member, IAM_OBJECT_READ_ROLE)

  def test_patch_multiple_objects(self):
    """Tests IAM patch against multiple objects."""
    self.RunGsUtil([
        'iam', 'ch', '-r',
        '%s:legacyObjectReader' % self.user, self.bucket.uri
    ])

    object_iam_string = self.RunGsUtil(['iam', 'get', self.object.uri],
                                       return_stdout=True)
    object2_iam_string = self.RunGsUtil(['iam', 'get', self.object2.uri],
                                        return_stdout=True)
    self.assertHas(object_iam_string, self.user, IAM_OBJECT_READ_ROLE)
    self.assertHas(object2_iam_string, self.user, IAM_OBJECT_READ_ROLE)

  def test_patch_multithreaded_multiple_objects(self):
    """Tests multithreaded behavior against multiple objects."""
    self.RunGsUtil([
        '-m', 'iam', 'ch', '-r',
        '%s:legacyObjectReader' % self.user, self.bucket.uri
    ])

    object_iam_string = self.RunGsUtil(['iam', 'get', self.object.uri],
                                       return_stdout=True)
    object2_iam_string = self.RunGsUtil(['iam', 'get', self.object2.uri],
                                        return_stdout=True)
    self.assertHas(object_iam_string, self.user, IAM_OBJECT_READ_ROLE)
    self.assertHas(object2_iam_string, self.user, IAM_OBJECT_READ_ROLE)

  def test_patch_error(self):
    """See TestIamSet.test_set_error."""
    stderr = self.RunGsUtil([
        'iam', 'ch',
        '%s:%s' % (self.user, IAM_BUCKET_READ_ROLE_ABBREV), self.bucket.uri,
        'gs://%s' % self.nonexistent_bucket_name, self.bucket2.uri
    ],
                            return_stderr=True,
                            expected_status=1)
    if self._use_gcloud_storage:
      self.assertIn('not found: 404.', stderr)
    else:
      self.assertIn('BucketNotFoundException', stderr)

    bucket_iam_string = self.RunGsUtil(['iam', 'get', self.bucket.uri],
                                       return_stdout=True)
    bucket2_iam_string = self.RunGsUtil(['iam', 'get', self.bucket2.uri],
                                        return_stdout=True)

    self.assertHas(bucket_iam_string, self.user, IAM_BUCKET_READ_ROLE)
    self.assertEqualsPoliciesString(bucket2_iam_string, self.bucket_iam_string)

  def test_patch_force_error(self):
    """See TestIamSet.test_set_force_error."""
    stderr = self.RunGsUtil([
        'iam', 'ch', '-f',
        '%s:%s' % (self.user, IAM_BUCKET_READ_ROLE_ABBREV), self.bucket.uri,
        'gs://%s' % self.nonexistent_bucket_name, self.bucket2.uri
    ],
                            return_stderr=True,
                            expected_status=1)
    if self._use_gcloud_storage:
      self.assertIn('not found: 404.', stderr)
    else:
      self.assertIn('CommandException', stderr)

    bucket_iam_string = self.RunGsUtil(['iam', 'get', self.bucket.uri],
                                       return_stdout=True)
    bucket2_iam_string = self.RunGsUtil(['iam', 'get', self.bucket2.uri],
                                        return_stdout=True)

    self.assertHas(bucket_iam_string, self.user, IAM_BUCKET_READ_ROLE)
    self.assertHas(bucket2_iam_string, self.user, IAM_BUCKET_READ_ROLE)

  def test_patch_multithreaded_error(self):
    """See TestIamSet.test_set_multithreaded_error."""
    # TODO(b/135780661): Remove retry after bug resolved
    @Retry(AssertionError, tries=3, timeout_secs=1)
    def _Check1():
      stderr = self.RunGsUtil([
          '-m', 'iam', 'ch', '-r',
          '%s:legacyObjectReader' % self.user,
          'gs://%s' % self.nonexistent_bucket_name, self.bucket.uri
      ],
                              return_stderr=True,
                              expected_status=1)
      if self._use_gcloud_storage:
        self.assertIn('not found: 404.', stderr)
      else:
        self.assertIn('BucketNotFoundException', stderr)

    # TODO(b/135780661): Remove retry after bug resolved
    @Retry(AssertionError, tries=3, timeout_secs=1)
    def _Check2():
      object_iam_string = self.RunGsUtil(['iam', 'get', self.object.uri],
                                         return_stdout=True)
      object2_iam_string = self.RunGsUtil(['iam', 'get', self.object2.uri],
                                          return_stdout=True)

      self.assertEqualsPoliciesString(self.object_iam_string, object_iam_string)
      self.assertEqualsPoliciesString(self.object_iam_string,
                                      object2_iam_string)

    _Check1()
    _Check2()

  def test_assert_has(self):
    test_policy = {
        'bindings': [{
            'members': ['allUsers'],
            'role': 'roles/storage.admin'
        }, {
            'members': ['user:foo@bar.com', 'serviceAccount:bar@foo.com'],
            'role': IAM_BUCKET_READ_ROLE
        }]
    }

    self.assertHas(json.dumps(test_policy), 'allUsers', 'roles/storage.admin')
    self.assertHas(json.dumps(test_policy), 'user:foo@bar.com',
                   IAM_BUCKET_READ_ROLE)
    self.assertHasNo(json.dumps(test_policy), 'allUsers', IAM_BUCKET_READ_ROLE)
    self.assertHasNo(json.dumps(test_policy), 'user:foo@bar.com',
                     'roles/storage.admin')

  def assertHas(self, policy, member, role):
    """Asserts a member has permission for role.

    Given an IAM policy, check if the specified member is bound to the
    specified role. Does not check group inheritence -- that is, if checking
    against the [{'member': ['allUsers'], 'role': X}] policy, this function
    will still raise an exception when testing for any member other than
    'allUsers' against role X.

    This function does not invoke the TestIamPolicy endpoints to smartly check
    IAM policy resolution. This function is simply to assert the expected IAM
    policy is returned, not whether or not the IAM policy is being invoked as
    expected.

    Args:
      policy: Policy object as formatted by IamCommand._GetIam()
      member: A member string (e.g. 'user:foo@bar.com').
      role: A fully specified role (e.g. 'roles/storage.admin')

    Raises:
      AssertionError if member is not bound to role.
    """

    policy = json.loads(policy)
    bindings = dict((p['role'], p) for p in policy.get('bindings', []))
    if role in bindings:
      if member in bindings[role]['members']:
        return
    raise AssertionError('Member \'%s\' does not have permission \'%s\' in '
                         'policy %s' % (member, role, policy))

  def assertHasNo(self, policy, member, role):
    """Functions as logical compliment of TestIamCh.assertHas()."""
    try:
      self.assertHas(policy, member, role)
    except AssertionError:
      pass
    else:
      raise AssertionError('Member \'%s\' has permission \'%s\' in '
                           'policy %s' % (member, role, policy))


@SkipForS3('Tests use GS IAM model.')
@SkipForXML('XML IAM control is not supported.')
class TestIamSet(TestIamIntegration):
  """Integration tests for iam set command."""

  # TODO(iam-beta): Replace gen_binding, _patch_binding with generators from
  # iam_helper.
  def setUp(self):
    super(TestIamSet, self).setUp()

    self.public_bucket_read_binding = gen_binding(IAM_BUCKET_READ_ROLE)
    self.public_object_read_binding = gen_binding(IAM_OBJECT_READ_ROLE)
    self.project_viewer_objectviewer_with_cond_binding = gen_binding(
        IAM_OBJECT_VIEWER_ROLE,
        # Note: We use projectViewer:some-project-id here because conditions
        # cannot be applied to a binding that only has allUsers in the members
        # list; the API gives back a 400 error if you try.
        members=['projectViewer:%s' % PopulateProjectId()],
        condition={
            'title': TEST_CONDITION_TITLE,
            'description': TEST_CONDITION_DESCRIPTION,
            'expression': TEST_CONDITION_EXPR_RESOURCE_IS_OBJECT,
        })

    self.bucket = self.CreateBucket()
    self.versioned_bucket = self.CreateVersionedBucket()

    # Create a bucket to fetch its policy, used as a base for other policies.
    self.bucket_iam_string = self.RunGsUtil(['iam', 'get', self.bucket.uri],
                                            return_stdout=True)
    self.old_bucket_iam_path = self.CreateTempFile(
        contents=self.bucket_iam_string.encode(UTF8))

    # Using the existing bucket's policy, make an altered policy that allows
    # allUsers to be "legacyBucketReader"s. Some tests will later apply this
    # policy.
    self.new_bucket_iam_policy = patch_binding(
        json.loads(self.bucket_iam_string), IAM_BUCKET_READ_ROLE,
        self.public_bucket_read_binding)
    self.new_bucket_iam_path = self.CreateTempFile(
        contents=json.dumps(self.new_bucket_iam_policy).encode(UTF8))

    # Using the existing bucket's policy, make an altered policy that contains
    # a binding with a condition in it. Some tests will later apply this policy.
    self.new_bucket_policy_with_conditions_policy = json.loads(
        self.bucket_iam_string)
    self.new_bucket_policy_with_conditions_policy['bindings'].append(
        self.project_viewer_objectviewer_with_cond_binding[0])
    self.new_bucket_policy_with_conditions_path = self.CreateTempFile(
        contents=json.dumps(self.new_bucket_policy_with_conditions_policy))

    # Create an object to fetch its policy, used as a base for other policies.
    self.object = self.CreateObject(contents='foobar')
    self.object_iam_string = self.RunGsUtil(['iam', 'get', self.object.uri],
                                            return_stdout=True)
    self.old_object_iam_path = self.CreateTempFile(
        contents=self.object_iam_string.encode(UTF8))

    # Using the existing object's policy, make an altered policy that allows
    # allUsers to be "legacyObjectReader"s. Some tests will later apply this
    # policy.
    self.new_object_iam_policy = patch_binding(
        json.loads(self.object_iam_string), IAM_OBJECT_READ_ROLE,
        self.public_object_read_binding)
    self.new_object_iam_path = self.CreateTempFile(
        contents=json.dumps(self.new_object_iam_policy).encode(UTF8))

  def test_seek_ahead_iam(self):
    """Ensures that the seek-ahead iterator is being used with iam commands."""

    gsutil_object = self.CreateObject(bucket_uri=self.bucket,
                                      contents=b'foobar')

    # This forces the seek-ahead iterator to be utilized.
    with SetBotoConfigForTest([('GSUtil', 'task_estimation_threshold', '1'),
                               ('GSUtil', 'task_estimation_force', 'True')]):
      stderr = self.RunGsUtil(
          ['-m', 'iam', 'set', self.new_object_iam_path, gsutil_object.uri],
          return_stderr=True)
      self.assertIn('Estimated work for this command: objects: 1\n', stderr)

  def test_set_mix_of_buckets_and_objects(self):
    """Tests that failure is thrown when buckets and objects are provided."""

    stderr = self.RunGsUtil([
        'iam', 'set', self.new_object_iam_path, self.bucket.uri, self.object.uri
    ],
                            return_stderr=True,
                            expected_status=1)
    self.assertIn('CommandException', stderr)

  def test_set_file_url(self):
    """Tests that failure is thrown when a file url is provided."""
    stderr = self.RunGsUtil(
        ['iam', 'set', self.new_object_iam_path, 'file://somefile'],
        return_stderr=True,
        expected_status=1)
    self.assertIn('AttributeError', stderr)

  def test_set_invalid_iam_bucket(self):
    """Ensures invalid content returns error on input check."""
    # TODO(b/135780661): Remove retry after bug resolved
    @Retry(AssertionError, tries=3, timeout_secs=1)
    def _Check1():
      inpath = self.CreateTempFile(contents=b'badIam')
      stderr = self.RunGsUtil(['iam', 'set', inpath, self.bucket.uri],
                              return_stderr=True,
                              expected_status=1)
      error_message = ('Found invalid JSON/YAML'
                       if self._use_gcloud_storage else 'ArgumentException')
      self.assertIn(error_message, stderr)

    # TODO(b/135780661): Remove retry after bug resolved
    @Retry(AssertionError, tries=3, timeout_secs=1)
    def _Check2():
      # Tests that setting with a non-existent file will also return error.
      stderr = self.RunGsUtil(
          ['iam', 'set', 'nonexistent/path', self.bucket.uri],
          return_stderr=True,
          expected_status=1)
      error_message = ('No such file or directory'
                       if self._use_gcloud_storage else 'ArgumentException')
      self.assertIn(error_message, stderr)

    _Check1()
    _Check2()

  def test_get_invalid_bucket(self):
    """Ensures that invalid bucket names returns an error."""
    stderr = self.RunGsUtil(['iam', 'get', self.nonexistent_bucket_name],
                            return_stderr=True,
                            expected_status=1)
    error_message = ('AttributeError'
                     if self._use_gcloud_storage else 'CommandException')
    self.assertIn(error_message, stderr)

    stderr = self.RunGsUtil(
        ['iam', 'get', 'gs://%s' % self.nonexistent_bucket_name],
        return_stderr=True,
        expected_status=1)
    error_message = ('not found'
                     if self._use_gcloud_storage else 'BucketNotFoundException')
    self.assertIn(error_message, stderr)

    # N.B.: The call to wildcard_iterator.WildCardIterator here will invoke
    # ListBucket, which only promises eventual consistency. We use @Retry here
    # to mitigate errors due to this.
    @Retry(AssertionError, tries=3, timeout_secs=1)
    def _Check():  # pylint: disable=invalid-name
      # There are at least two buckets in the project
      # due to TestIamSet.setUp().
      stderr = self.RunGsUtil(['iam', 'get', 'gs://*'],
                              return_stderr=True,
                              expected_status=1)
      error_message = ('must match a single cloud resource'
                       if self._use_gcloud_storage else 'CommandException')
      self.assertIn(error_message, stderr)

    _Check()

  def test_set_valid_iam_bucket(self):
    """Tests setting a valid IAM on a bucket."""
    self.RunGsUtil(
        ['iam', 'set', '-e', '', self.new_bucket_iam_path, self.bucket.uri])
    set_iam_string = self.RunGsUtil(['iam', 'get', self.bucket.uri],
                                    return_stdout=True)
    self.RunGsUtil(
        ['iam', 'set', '-e', '', self.old_bucket_iam_path, self.bucket.uri])
    reset_iam_string = self.RunGsUtil(['iam', 'get', self.bucket.uri],
                                      return_stdout=True)

    self.assertEqualsPoliciesString(self.bucket_iam_string, reset_iam_string)
    self.assertIn(self.public_bucket_read_binding[0],
                  json.loads(set_iam_string)['bindings'])

  @unittest.skip('Disabled until all projects whitelisted for conditions.')
  def test_set_and_get_valid_bucket_policy_with_conditions(self):
    """Tests setting and getting an IAM policy with conditions on a bucket."""
    self.RunGsUtil([
        'iam', 'set', '-e', '', self.new_bucket_policy_with_conditions_path,
        self.bucket.uri
    ])
    get_iam_string = self.RunGsUtil(['iam', 'get', self.bucket.uri],
                                    return_stdout=True)
    self.assertIn(TEST_CONDITION_DESCRIPTION, get_iam_string)
    self.assertIn(TEST_CONDITION_EXPR_RESOURCE_IS_OBJECT,
                  get_iam_string.replace('\\', ''))
    self.assertIn(TEST_CONDITION_TITLE, get_iam_string)

  # Note: We only test this for buckets, since objects cannot currently have
  # conditions in their policy bindings.
  @unittest.skip('Disabled until all projects whitelisted for conditions.')
  def test_ch_fails_after_setting_conditions(self):
    """Tests that if we "set" a policy with conditions, "ch" won't patch it."""
    print()
    self.RunGsUtil([
        'iam', 'set', '-e', '', self.new_bucket_policy_with_conditions_path,
        self.bucket.uri
    ])

    # Assert that we get an error both with and without ch's `-f` option.
    # Without `-f`:
    stderr = self.RunGsUtil(
        ['iam', 'ch', 'allUsers:objectViewer', self.bucket.uri],
        return_stderr=True,
        expected_status=1)
    self.assertIn('CommandException: Could not patch IAM policy for', stderr)
    # Also make sure we print the workaround message.
    self.assertIn('The resource had conditions present', stderr)

    # With `-f`:
    stderr = self.RunGsUtil(
        ['iam', 'ch', '-f', 'allUsers:objectViewer', self.bucket.uri],
        return_stderr=True,
        expected_status=1)
    self.assertIn('CommandException: Some IAM policies could not be patched',
                  stderr)
    # Also make sure we print the workaround message.
    self.assertIn('Some resources had conditions', stderr)

  def test_set_blank_etag(self):
    """Tests setting blank etag behaves appropriately."""
    self.RunGsUtil(
        ['iam', 'set', '-e', '', self.new_bucket_iam_path, self.bucket.uri])

    set_iam_string = self.RunGsUtil(['iam', 'get', self.bucket.uri],
                                    return_stdout=True)

    self.RunGsUtil([
        'iam', 'set', '-e',
        json.loads(set_iam_string)['etag'], self.old_bucket_iam_path,
        self.bucket.uri
    ])

    reset_iam_string = self.RunGsUtil(['iam', 'get', self.bucket.uri],
                                      return_stdout=True)

    self.assertEqualsPoliciesString(self.bucket_iam_string, reset_iam_string)
    self.assertIn(self.public_bucket_read_binding[0],
                  json.loads(set_iam_string)['bindings'])

  def test_set_valid_etag(self):
    """Tests setting valid etag behaves correctly."""
    get_iam_string = self.RunGsUtil(['iam', 'get', self.bucket.uri],
                                    return_stdout=True)
    self.RunGsUtil([
        'iam', 'set', '-e',
        json.loads(get_iam_string)['etag'], self.new_bucket_iam_path,
        self.bucket.uri
    ])

    set_iam_string = self.RunGsUtil(['iam', 'get', self.bucket.uri],
                                    return_stdout=True)
    self.RunGsUtil([
        'iam', 'set', '-e',
        json.loads(set_iam_string)['etag'], self.old_bucket_iam_path,
        self.bucket.uri
    ])

    reset_iam_string = self.RunGsUtil(['iam', 'get', self.bucket.uri],
                                      return_stdout=True)

    self.assertEqualsPoliciesString(self.bucket_iam_string, reset_iam_string)
    self.assertIn(self.public_bucket_read_binding[0],
                  json.loads(set_iam_string)['bindings'])

  def test_set_invalid_etag(self):
    """Tests setting an invalid etag format raises an error."""
    self.RunGsUtil(['iam', 'get', self.bucket.uri], return_stdout=True)
    stderr = self.RunGsUtil([
        'iam', 'set', '-e', 'some invalid etag', self.new_bucket_iam_path,
        self.bucket.uri
    ],
                            return_stderr=True,
                            expected_status=1)
    error_message = ('DecodeError'
                     if self._use_gcloud_storage else 'ArgumentException')
    self.assertIn(error_message, stderr)

  def test_set_mismatched_etag(self):
    """Tests setting mismatched etag raises an error."""
    get_iam_string = self.RunGsUtil(['iam', 'get', self.bucket.uri],
                                    return_stdout=True)
    self.RunGsUtil([
        'iam', 'set', '-e',
        json.loads(get_iam_string)['etag'], self.new_bucket_iam_path,
        self.bucket.uri
    ])
    stderr = self.RunGsUtil([
        'iam', 'set', '-e',
        json.loads(get_iam_string)['etag'], self.new_bucket_iam_path,
        self.bucket.uri
    ],
                            return_stderr=True,
                            expected_status=1)
    error_message = ('pre-conditions you specified did not hold'
                     if self._use_gcloud_storage else 'PreconditionException')
    self.assertIn(error_message, stderr)

  def _create_multiple_objects(self):
    """Creates two versioned objects and return references to all versions.

    Returns:
      A four-tuple (a, b, a*, b*) of storage_uri.BucketStorageUri instances.
    """

    old_gsutil_object = self.CreateObject(bucket_uri=self.versioned_bucket,
                                          contents=b'foo')
    old_gsutil_object2 = self.CreateObject(bucket_uri=self.versioned_bucket,
                                           contents=b'bar')
    gsutil_object = self.CreateObject(
        bucket_uri=self.versioned_bucket,
        object_name=old_gsutil_object.object_name,
        contents=b'new_foo',
        gs_idempotent_generation=urigen(old_gsutil_object))
    gsutil_object2 = self.CreateObject(
        bucket_uri=self.versioned_bucket,
        object_name=old_gsutil_object2.object_name,
        contents=b'new_bar',
        gs_idempotent_generation=urigen(old_gsutil_object2))
    return (old_gsutil_object, old_gsutil_object2, gsutil_object,
            gsutil_object2)

  def test_set_valid_iam_multiple_objects(self):
    """Tests setting a valid IAM on multiple objects."""
    (old_gsutil_object, old_gsutil_object2, gsutil_object,
     gsutil_object2) = self._create_multiple_objects()

    # Set IAM policy on newest versions of all objects.
    self.RunGsUtil([
        'iam', 'set', '-r', self.new_object_iam_path, self.versioned_bucket.uri
    ])
    set_iam_string = self.RunGsUtil(['iam', 'get', gsutil_object.uri],
                                    return_stdout=True)
    set_iam_string2 = self.RunGsUtil(['iam', 'get', gsutil_object2.uri],
                                     return_stdout=True)
    self.assertEqualsPoliciesString(set_iam_string, set_iam_string2)
    self.assertIn(self.public_object_read_binding[0],
                  json.loads(set_iam_string)['bindings'])

    # Check that old versions are not affected by the set IAM call.
    iam_string_old = self.RunGsUtil(
        ['iam', 'get', old_gsutil_object.version_specific_uri],
        return_stdout=True)
    iam_string_old2 = self.RunGsUtil(
        ['iam', 'get', old_gsutil_object2.version_specific_uri],
        return_stdout=True)
    self.assertEqualsPoliciesString(iam_string_old, iam_string_old2)
    self.assertEqualsPoliciesString(self.object_iam_string, iam_string_old)

  def test_set_valid_iam_multithreaded_multiple_objects(self):
    """Tests setting a valid IAM on multiple objects."""
    (old_gsutil_object, old_gsutil_object2, gsutil_object,
     gsutil_object2) = self._create_multiple_objects()

    # Set IAM policy on newest versions of all objects.
    self.RunGsUtil([
        '-m', 'iam', 'set', '-r', self.new_object_iam_path,
        self.versioned_bucket.uri
    ])
    set_iam_string = self.RunGsUtil(['iam', 'get', gsutil_object.uri],
                                    return_stdout=True)
    set_iam_string2 = self.RunGsUtil(['iam', 'get', gsutil_object2.uri],
                                     return_stdout=True)
    self.assertEqualsPoliciesString(set_iam_string, set_iam_string2)
    self.assertIn(self.public_object_read_binding[0],
                  json.loads(set_iam_string)['bindings'])

    # Check that old versions are not affected by the set IAM call.
    iam_string_old = self.RunGsUtil(
        ['iam', 'get', old_gsutil_object.version_specific_uri],
        return_stdout=True)
    iam_string_old2 = self.RunGsUtil(
        ['iam', 'get', old_gsutil_object2.version_specific_uri],
        return_stdout=True)
    self.assertEqualsPoliciesString(iam_string_old, iam_string_old2)
    self.assertEqualsPoliciesString(self.object_iam_string, iam_string_old)

  def test_set_valid_iam_multiple_objects_all_versions(self):
    """Tests set IAM policy on all versions of all objects."""
    (old_gsutil_object, old_gsutil_object2, gsutil_object,
     gsutil_object2) = self._create_multiple_objects()

    self.RunGsUtil([
        'iam', 'set', '-ra', self.new_object_iam_path, self.versioned_bucket.uri
    ])
    set_iam_string = self.RunGsUtil(
        ['iam', 'get', gsutil_object.version_specific_uri], return_stdout=True)
    set_iam_string2 = self.RunGsUtil(
        ['iam', 'get', gsutil_object2.version_specific_uri], return_stdout=True)
    set_iam_string_old = self.RunGsUtil(
        ['iam', 'get', old_gsutil_object.version_specific_uri],
        return_stdout=True)
    set_iam_string_old2 = self.RunGsUtil(
        ['iam', 'get', old_gsutil_object2.version_specific_uri],
        return_stdout=True)
    self.assertEqualsPoliciesString(set_iam_string, set_iam_string2)
    self.assertEqualsPoliciesString(set_iam_string, set_iam_string_old)
    self.assertEqualsPoliciesString(set_iam_string, set_iam_string_old2)
    self.assertIn(self.public_object_read_binding[0],
                  json.loads(set_iam_string)['bindings'])

  def test_set_error(self):
    """Tests fail-fast behavior of iam set.

    We initialize two buckets (bucket, bucket2) and attempt to set both along
    with a third, non-existent bucket in between, self.nonexistent_bucket_name.

    We want to ensure
      1.) Bucket "bucket" IAM policy has been set appropriately,
      2.) Bucket self.nonexistent_bucket_name has caused an error, and
      3.) gsutil has exited and "bucket2"'s IAM policy is unaltered.
    """

    bucket = self.CreateBucket()
    bucket2 = self.CreateBucket()

    stderr = self.RunGsUtil([
        'iam', 'set', '-e', '', self.new_bucket_iam_path, bucket.uri,
        'gs://%s' % self.nonexistent_bucket_name, bucket2.uri
    ],
                            return_stderr=True,
                            expected_status=1)

    # The program has exited due to a bucket lookup 404.
    error_message = ('not found'
                     if self._use_gcloud_storage else 'BucketNotFoundException')
    self.assertIn(error_message, stderr)
    set_iam_string = self.RunGsUtil(['iam', 'get', bucket.uri],
                                    return_stdout=True)
    set_iam_string2 = self.RunGsUtil(['iam', 'get', bucket2.uri],
                                     return_stdout=True)

    # The IAM policy has been set on Bucket "bucket".
    self.assertIn(self.public_bucket_read_binding[0],
                  json.loads(set_iam_string)['bindings'])

    # The IAM policy for Bucket "bucket2" remains unchanged.
    self.assertEqualsPoliciesString(self.bucket_iam_string, set_iam_string2)

  def test_set_force_error(self):
    """Tests ignoring failure behavior of iam set.

    Similar to TestIamSet.test_set_error, except here we want to ensure
      1.) Bucket "bucket" IAM policy has been set appropriately,
      2.) Bucket self.nonexistent_bucket_name has caused an error, BUT
      3.) gsutil has continued and "bucket2"'s IAM policy has been set as well.
    """
    bucket = self.CreateBucket()
    bucket2 = self.CreateBucket()

    stderr = self.RunGsUtil([
        'iam', 'set', '-f', self.new_bucket_iam_path, bucket.uri,
        'gs://%s' % self.nonexistent_bucket_name, bucket2.uri
    ],
                            return_stderr=True,
                            expected_status=1)

    # The program asserts that an error has occured (due to 404).
    error_message = ('not found'
                     if self._use_gcloud_storage else 'CommandException')
    self.assertIn(error_message, stderr)

    set_iam_string = self.RunGsUtil(['iam', 'get', bucket.uri],
                                    return_stdout=True)
    set_iam_string2 = self.RunGsUtil(['iam', 'get', bucket2.uri],
                                     return_stdout=True)

    # The IAM policy has been set appropriately on Bucket "bucket".
    self.assertIn(self.public_bucket_read_binding[0],
                  json.loads(set_iam_string)['bindings'])

    # The IAM policy has also been set on Bucket "bucket2".
    self.assertEqualsPoliciesString(set_iam_string, set_iam_string2)

  def test_set_multithreaded_error(self):
    """Tests fail-fast behavior of multithreaded iam set.

    This is testing gsutil iam set with the -m and -r flags present in
    invocation.

    N.B.: Currently, (-m, -r) behaves identically to (-m, -fr) and (-fr,).
    However, (-m, -fr) and (-fr,) behavior is not as expected due to
    name_expansion.NameExpansionIterator.next raising problematic e.g. 404
    or 403 errors. More details on this issue can be found in comments in
    commands.iam.IamCommand._SetIam.

    Thus, the following command
      gsutil -m iam set -fr <object_policy> gs://bad_bucket gs://good_bucket

    will NOT set policies on objects in gs://good_bucket due to an error when
    iterating over gs://bad_bucket.
    """

    # TODO(b/135780661): Remove retry after bug resolved
    @Retry(AssertionError, tries=3, timeout_secs=1)
    def _Check1():
      stderr = self.RunGsUtil([
          '-m', 'iam', 'set', '-r', self.new_object_iam_path,
          'gs://%s' % self.nonexistent_bucket_name, self.bucket.uri
      ],
                              return_stderr=True,
                              expected_status=1)
      error_message = ('not found' if self._use_gcloud_storage else
                       'BucketNotFoundException')
      self.assertIn(error_message, stderr)

    # TODO(b/135780661): Remove retry after bug resolved
    @Retry(AssertionError, tries=3, timeout_secs=1)
    def _Check2():
      gsutil_object = self.CreateObject(bucket_uri=self.bucket,
                                        contents=b'foobar')
      gsutil_object2 = self.CreateObject(bucket_uri=self.bucket,
                                         contents=b'foobar')
      set_iam_string = self.RunGsUtil(['iam', 'get', gsutil_object.uri],
                                      return_stdout=True)
      set_iam_string2 = self.RunGsUtil(['iam', 'get', gsutil_object2.uri],
                                       return_stdout=True)
      self.assertEqualsPoliciesString(set_iam_string, set_iam_string2)
      self.assertEqualsPoliciesString(self.object_iam_string, set_iam_string)

    _Check1()
    _Check2()

  def test_set_valid_iam_single_unversioned_object(self):
    """Tests setting a valid IAM on an object."""
    gsutil_object = self.CreateObject(bucket_uri=self.bucket,
                                      contents=b'foobar')

    lookup_uri = gsutil_object.uri
    self.RunGsUtil(['iam', 'set', self.new_object_iam_path, lookup_uri])
    set_iam_string = self.RunGsUtil(['iam', 'get', lookup_uri],
                                    return_stdout=True)
    self.RunGsUtil([
        'iam', 'set', '-e',
        json.loads(set_iam_string)['etag'], self.old_object_iam_path, lookup_uri
    ])
    reset_iam_string = self.RunGsUtil(['iam', 'get', lookup_uri],
                                      return_stdout=True)

    self.assertEqualsPoliciesString(self.object_iam_string, reset_iam_string)
    self.assertIn(self.public_object_read_binding[0],
                  json.loads(set_iam_string)['bindings'])

  def test_set_valid_iam_single_versioned_object(self):
    """Tests setting a valid IAM on a versioned object."""
    gsutil_object = self.CreateObject(bucket_uri=self.bucket,
                                      contents=b'foobar')

    lookup_uri = gsutil_object.version_specific_uri
    self.RunGsUtil(['iam', 'set', self.new_object_iam_path, lookup_uri])
    set_iam_string = self.RunGsUtil(['iam', 'get', lookup_uri],
                                    return_stdout=True)
    self.RunGsUtil([
        'iam', 'set', '-e',
        json.loads(set_iam_string)['etag'], self.old_object_iam_path, lookup_uri
    ])
    reset_iam_string = self.RunGsUtil(['iam', 'get', lookup_uri],
                                      return_stdout=True)

    self.assertEqualsPoliciesString(self.object_iam_string, reset_iam_string)
    self.assertIn(self.public_object_read_binding[0],
                  json.loads(set_iam_string)['bindings'])

  def test_set_valid_iam_multithreaded_single_object(self):
    """Tests setting a valid IAM on a single object with multithreading."""
    gsutil_object = self.CreateObject(bucket_uri=self.bucket,
                                      contents=b'foobar')

    lookup_uri = gsutil_object.version_specific_uri
    self.RunGsUtil(
        ['-m', 'iam', 'set', '-e', '', self.new_object_iam_path, lookup_uri])
    set_iam_string = self.RunGsUtil(['iam', 'get', lookup_uri],
                                    return_stdout=True)
    self.RunGsUtil(
        ['-m', 'iam', 'set', '-e', '', self.old_object_iam_path, lookup_uri])
    reset_iam_string = self.RunGsUtil(['iam', 'get', lookup_uri],
                                      return_stdout=True)

    self.assertEqualsPoliciesString(self.object_iam_string, reset_iam_string)
    self.assertIn(self.public_object_read_binding[0],
                  json.loads(set_iam_string)['bindings'])

    # Test multithreading on single object, specified with wildcards.
    lookup_uri = '%s*' % self.bucket.uri
    self.RunGsUtil(
        ['-m', 'iam', 'set', '-e', '', self.new_object_iam_path, lookup_uri])
    set_iam_string = self.RunGsUtil(['iam', 'get', lookup_uri],
                                    return_stdout=True)
    self.RunGsUtil(
        ['-m', 'iam', 'set', '-e', '', self.old_object_iam_path, lookup_uri])
    reset_iam_string = self.RunGsUtil(['iam', 'get', lookup_uri],
                                      return_stdout=True)

    self.assertEqualsPoliciesString(self.object_iam_string, reset_iam_string)
    self.assertIn(self.public_object_read_binding[0],
                  json.loads(set_iam_string)['bindings'])


class TestIamShim(testcase.ShimUnitTestBase):
  _FAKE_CONFIG_GET_ACCOUNT_PROCESS = subprocess.CompletedProcess(
      args=[], returncode=0, stdout='fake_account@gmail.com')
  _MOCK_CONFIG_GET_ACCOUNT_CALL = mock.call(
      [
          shim_util._get_gcloud_binary_path('fake_dir'), 'config', 'get',
          'account'
      ],
      stderr=-1,
      stdout=-1,
      encoding='utf-8',
  )

  @mock.patch.object(iam.IamCommand, 'RunCommand', new=mock.Mock())
  def test_shim_translates_iam_get_object(self):
    with SetBotoConfigForTest([('GSUtil', 'use_gcloud_storage', 'True'),
                               ('GSUtil', 'hidden_shim_mode', 'dry_run')]):
      with SetEnvironmentForTest({
          'CLOUDSDK_CORE_PASS_CREDENTIALS_TO_GSUTIL': 'True',
          'CLOUDSDK_ROOT_DIR': 'fake_dir',
      }):
        mock_log_handler = self.RunCommand('iam', ['get', 'gs://bucket/object'],
                                           return_log_handler=True)
        info_lines = '\n'.join(mock_log_handler.messages['info'])
        self.assertIn(
            ('Gcloud Storage Command: {} storage objects get-iam-policy'
             ' --format=json gs://bucket/object').format(
                 shim_util._get_gcloud_binary_path('fake_dir')), info_lines)

  @mock.patch.object(iam.IamCommand, 'RunCommand', new=mock.Mock())
  def test_shim_translates_iam_get_bucket(self):
    with SetBotoConfigForTest([('GSUtil', 'use_gcloud_storage', 'True'),
                               ('GSUtil', 'hidden_shim_mode', 'dry_run')]):
      with SetEnvironmentForTest({
          'CLOUDSDK_CORE_PASS_CREDENTIALS_TO_GSUTIL': 'True',
          'CLOUDSDK_ROOT_DIR': 'fake_dir',
      }):
        mock_log_handler = self.RunCommand('iam', ['get', 'gs://bucket'],
                                           return_log_handler=True)
        info_lines = '\n'.join(mock_log_handler.messages['info'])
        self.assertIn(
            ('Gcloud Storage Command: {} storage buckets get-iam-policy'
             ' --format=json gs://bucket').format(
                 shim_util._get_gcloud_binary_path('fake_dir')), info_lines)

  @mock.patch.object(iam.IamCommand, 'RunCommand', new=mock.Mock())
  def test_shim_translates_iam_set_object(self):
    with SetBotoConfigForTest([('GSUtil', 'use_gcloud_storage', 'True'),
                               ('GSUtil', 'hidden_shim_mode', 'dry_run')]):
      with SetEnvironmentForTest({
          'CLOUDSDK_CORE_PASS_CREDENTIALS_TO_GSUTIL': 'True',
          'CLOUDSDK_ROOT_DIR': 'fake_dir',
      }):
        mock_log_handler = self.RunCommand(
            'iam', ['set', 'policy-file', 'gs://b/o1', 'gs://b/o2'],
            return_log_handler=True)
        info_lines = '\n'.join(mock_log_handler.messages['info'])
        self.assertIn(
            ('Gcloud Storage Command: {} storage objects set-iam-policy'
             ' --format=json gs://b/o1 gs://b/o2 policy-file').format(
                 shim_util._get_gcloud_binary_path('fake_dir')), info_lines)

  @mock.patch.object(iam.IamCommand, 'RunCommand', new=mock.Mock())
  def test_shim_translates_iam_set_bucket(self):
    with SetBotoConfigForTest([('GSUtil', 'use_gcloud_storage', 'True'),
                               ('GSUtil', 'hidden_shim_mode', 'dry_run')]):
      with SetEnvironmentForTest({
          'CLOUDSDK_CORE_PASS_CREDENTIALS_TO_GSUTIL': 'True',
          'CLOUDSDK_ROOT_DIR': 'fake_dir',
      }):
        mock_log_handler = self.RunCommand(
            'iam', ['set', 'policy-file', 'gs://b1', 'gs://b2'],
            return_log_handler=True)
        info_lines = '\n'.join(mock_log_handler.messages['info'])
        self.assertIn(
            ('Gcloud Storage Command: {} storage buckets set-iam-policy'
             ' --format=json gs://b1 gs://b2 policy-file').format(
                 shim_util._get_gcloud_binary_path('fake_dir')), info_lines)

  @mock.patch.object(iam.IamCommand, 'RunCommand', new=mock.Mock())
  def test_shim_translates_iam_set_mix_of_bucket_and_objects_if_recursive(self):
    with SetBotoConfigForTest([('GSUtil', 'use_gcloud_storage', 'True'),
                               ('GSUtil', 'hidden_shim_mode', 'dry_run')]):
      with SetEnvironmentForTest({
          'CLOUDSDK_CORE_PASS_CREDENTIALS_TO_GSUTIL': 'True',
          'CLOUDSDK_ROOT_DIR': 'fake_dir',
      }):
        mock_log_handler = self.RunCommand(
            'iam', ['set', '-r', 'policy-file', 'gs://b1', 'gs://b2/o'],
            return_log_handler=True)
        info_lines = '\n'.join(mock_log_handler.messages['info'])
        self.assertIn(
            ('Gcloud Storage Command: {} storage objects set-iam-policy'
             ' --format=json --recursive gs://b1 gs://b2/o policy-file').format(
                 shim_util._get_gcloud_binary_path('fake_dir')), info_lines)

  @mock.patch.object(iam.IamCommand, 'RunCommand', new=mock.Mock())
  def test_shim_raises_for_iam_set_mix_of_bucket_and_objects(self):
    with SetBotoConfigForTest([('GSUtil', 'use_gcloud_storage', 'True'),
                               ('GSUtil', 'hidden_shim_mode', 'dry_run')]):
      with SetEnvironmentForTest({
          'CLOUDSDK_CORE_PASS_CREDENTIALS_TO_GSUTIL': 'True',
          'CLOUDSDK_ROOT_DIR': 'fake_dir',
      }):
        with self.assertRaisesRegex(
            CommandException,
            'Cannot operate on a mix of buckets and objects.'):
          self.RunCommand('iam', ['set', 'policy-file', 'gs://b', 'gs://b/o'])

  @mock.patch.object(iam.IamCommand, 'RunCommand', new=mock.Mock())
  def test_shim_translates_iam_set_handles_valid_etag(self):
    with SetBotoConfigForTest([('GSUtil', 'use_gcloud_storage', 'True'),
                               ('GSUtil', 'hidden_shim_mode', 'dry_run')]):
      with SetEnvironmentForTest({
          'CLOUDSDK_CORE_PASS_CREDENTIALS_TO_GSUTIL': 'True',
          'CLOUDSDK_ROOT_DIR': 'fake_dir',
      }):
        mock_log_handler = self.RunCommand(
            'iam', ['set', '-e', 'abc=', 'policy-file', 'gs://b'],
            return_log_handler=True)
        info_lines = '\n'.join(mock_log_handler.messages['info'])
        self.assertIn(
            ('Gcloud Storage Command: {} storage buckets set-iam-policy'
             ' --format=json --etag abc= gs://b policy-file').format(
                 shim_util._get_gcloud_binary_path('fake_dir')), info_lines)

  @mock.patch.object(iam.IamCommand, 'RunCommand', new=mock.Mock())
  def test_shim_translates_iam_set_handles_empty_etag(self):
    with SetBotoConfigForTest([('GSUtil', 'use_gcloud_storage', 'True'),
                               ('GSUtil', 'hidden_shim_mode', 'dry_run')]):
      with SetEnvironmentForTest({
          'CLOUDSDK_CORE_PASS_CREDENTIALS_TO_GSUTIL': 'True',
          'CLOUDSDK_ROOT_DIR': 'fake_dir',
      }):
        mock_log_handler = self.RunCommand(
            'iam', ['set', '-e', '', 'policy-file', 'gs://b'],
            return_log_handler=True)
        info_lines = '\n'.join(mock_log_handler.messages['info'])
        self.assertIn(
            ('Gcloud Storage Command: {} storage buckets set-iam-policy'
             ' --format=json --etag= gs://b policy-file').format(
                 shim_util._get_gcloud_binary_path('fake_dir')), info_lines)

  @mock.patch.object(iam.IamCommand, 'RunCommand', new=mock.Mock())
  def test_shim_warns_with_dry_run_mode_for_iam_ch(self):
    with SetBotoConfigForTest([('GSUtil', 'use_gcloud_storage', 'True'),
                               ('GSUtil', 'hidden_shim_mode', 'dry_run')]):
      with SetEnvironmentForTest({
          'CLOUDSDK_CORE_PASS_CREDENTIALS_TO_GSUTIL': 'True',
          'CLOUDSDK_ROOT_DIR': 'fake_dir',
      }):
        mock_log_handler = self.RunCommand('iam',
                                           ['ch', '-d', 'allUsers', 'gs://b'],
                                           return_log_handler=True)
        warning_lines = '\n'.join(mock_log_handler.messages['warning'])
        self.assertIn(
            'The shim maps iam ch commands to several gcloud storage commands,'
            ' which cannot be determined without running gcloud storage.',
            warning_lines)

  def _get_run_call(self,
                    command,
                    env=mock.ANY,
                    stdin=None,
                    stderr=subprocess.PIPE,
                    stdout=subprocess.PIPE,
                    text=True):
    return mock.call(command,
                     env=env,
                     input=stdin,
                     stderr=stderr,
                     stdout=stdout,
                     text=text)

  def test_iam_ch_adds_updates_and_deletes_bucket_policies(self):
    original_policy = {
        'bindings': [{
            'role': 'preserved-role',
            'members': ['allUsers'],
        }, {
            'role': 'roles/storage.modified-role',
            'members': ['allUsers', 'user:deleted-user@example.com'],
        }, {
            'role': 'roles/storage.deleted-role',
            'members': ['allUsers'],
        }]
    }
    new_policy = {
        'bindings': [{
            'role': 'preserved-role',
            'members': ['allUsers'],
        }, {
            'role': 'roles/storage.modified-role',
            'members': ['allAuthenticatedUsers', 'allUsers'],
        }]
    }
    with SetBotoConfigForTest([('GSUtil', 'use_gcloud_storage', 'True')]):
      get_process = subprocess.CompletedProcess(
          args=[], returncode=0, stdout=json.dumps(original_policy))
      set_process = subprocess.CompletedProcess(args=[], returncode=0)
      self._mock_subprocess_run.side_effect = [
          self._FAKE_CONFIG_GET_ACCOUNT_PROCESS, get_process, set_process
      ]
      with SetEnvironmentForTest({
          'CLOUDSDK_CORE_PASS_CREDENTIALS_TO_GSUTIL': 'True',
          'CLOUDSDK_ROOT_DIR': 'fake_dir',
      }):
        self.RunCommand('iam', [
            'ch', 'allAuthenticatedUsers:modified-role', '-d',
            'user:deleted-user@example.com', '-d', 'allUsers:deleted-role',
            'gs://b'
        ])

      self.assertEqual(self._mock_subprocess_run.call_args_list, [
          self._MOCK_CONFIG_GET_ACCOUNT_CALL,
          self._get_run_call([
              shim_util._get_gcloud_binary_path('fake_dir'), 'storage',
              'buckets', 'get-iam-policy', 'gs://b/', '--format=json'
          ]),
          self._get_run_call([
              shim_util._get_gcloud_binary_path('fake_dir'),
              'storage',
              'buckets',
              'set-iam-policy',
              'gs://b/',
              '-',
          ],
                             stdin=json.dumps(new_policy, sort_keys=True))
      ])

  def test_iam_ch_updates_bucket_policies_for_multiple_urls(self):
    original_policy1 = {
        'bindings': [{
            'role': 'roles/storage.modified-role',
            'members': ['user:test-user1@example.com'],
        }]
    }
    original_policy2 = {
        'bindings': [{
            'role': 'roles/storage.modified-role',
            'members': ['user:test-user2@example.com'],
        }]
    }
    new_policy1 = {
        'bindings': [{
            'role': 'roles/storage.modified-role',
            'members': ['allAuthenticatedUsers', 'user:test-user1@example.com'],
        }]
    }
    new_policy2 = {
        'bindings': [{
            'role': 'roles/storage.modified-role',
            'members': ['allAuthenticatedUsers', 'user:test-user2@example.com'],
        }]
    }
    with SetBotoConfigForTest([('GSUtil', 'use_gcloud_storage', 'True')]):
      get_process1 = subprocess.CompletedProcess(
          args=[], returncode=0, stdout=json.dumps(original_policy1))
      get_process2 = subprocess.CompletedProcess(
          args=[], returncode=0, stdout=json.dumps(original_policy2))
      set_process = subprocess.CompletedProcess(args=[], returncode=0)
      self._mock_subprocess_run.side_effect = [
          self._FAKE_CONFIG_GET_ACCOUNT_PROCESS, get_process1, set_process,
          get_process2, set_process
      ]

      with SetEnvironmentForTest({
          'CLOUDSDK_CORE_PASS_CREDENTIALS_TO_GSUTIL': 'True',
          'CLOUDSDK_ROOT_DIR': 'fake_dir',
      }):
        self.RunCommand(
            'iam',
            ['ch', 'allAuthenticatedUsers:modified-role', 'gs://b1', 'gs://b2'])

      self.assertEqual(self._mock_subprocess_run.call_args_list, [
          self._MOCK_CONFIG_GET_ACCOUNT_CALL,
          self._get_run_call([
              shim_util._get_gcloud_binary_path('fake_dir'), 'storage',
              'buckets', 'get-iam-policy', 'gs://b1/', '--format=json'
          ]),
          self._get_run_call([
              shim_util._get_gcloud_binary_path('fake_dir'),
              'storage',
              'buckets',
              'set-iam-policy',
              'gs://b1/',
              '-',
          ],
                             stdin=json.dumps(new_policy1, sort_keys=True)),
          self._get_run_call([
              shim_util._get_gcloud_binary_path('fake_dir'), 'storage',
              'buckets', 'get-iam-policy', 'gs://b2/', '--format=json'
          ]),
          self._get_run_call([
              shim_util._get_gcloud_binary_path('fake_dir'),
              'storage',
              'buckets',
              'set-iam-policy',
              'gs://b2/',
              '-',
          ],
                             stdin=json.dumps(new_policy2, sort_keys=True))
      ])

  def test_iam_ch_updates_object_policies(self):
    original_policy = {
        'bindings': [{
            'role': 'roles/storage.modified-role',
            'members': ['allUsers'],
        }]
    }
    new_policy = {
        'bindings': [{
            'role': 'roles/storage.modified-role',
            'members': ['allAuthenticatedUsers', 'allUsers'],
        }]
    }
    with SetBotoConfigForTest([('GSUtil', 'use_gcloud_storage', 'True')]):
      ls_process = subprocess.CompletedProcess(args=[],
                                               returncode=0,
                                               stdout=json.dumps([{
                                                   'url': 'gs://b/o',
                                                   'type': 'cloud_object'
                                               }]))
      get_process = subprocess.CompletedProcess(
          args=[], returncode=0, stdout=json.dumps(original_policy))
      set_process = subprocess.CompletedProcess(args=[], returncode=0)
      self._mock_subprocess_run.side_effect = [
          self._FAKE_CONFIG_GET_ACCOUNT_PROCESS, ls_process, get_process,
          set_process
      ]
      with SetEnvironmentForTest({
          'CLOUDSDK_CORE_PASS_CREDENTIALS_TO_GSUTIL': 'True',
          'CLOUDSDK_ROOT_DIR': 'fake_dir',
      }):
        self.RunCommand(
            'iam', ['ch', 'allAuthenticatedUsers:modified-role', 'gs://b/o'])

      self.assertEqual(self._mock_subprocess_run.call_args_list, [
          self._MOCK_CONFIG_GET_ACCOUNT_CALL,
          self._get_run_call([
              shim_util._get_gcloud_binary_path('fake_dir'), 'storage', 'ls',
              '--json', 'gs://b/o'
          ]),
          self._get_run_call([
              shim_util._get_gcloud_binary_path('fake_dir'), 'storage',
              'objects', 'get-iam-policy', 'gs://b/o', '--format=json'
          ]),
          self._get_run_call([
              shim_util._get_gcloud_binary_path('fake_dir'),
              'storage',
              'objects',
              'set-iam-policy',
              'gs://b/o',
              '-',
          ],
                             stdin=json.dumps(new_policy, sort_keys=True))
      ])

  def test_iam_ch_expands_urls_with_recursion_and_ignores_container_headers(
      self):
    original_policy = {
        'bindings': [{
            'role': 'modified-role',
            'members': ['allUsers'],
        }]
    }
    with SetBotoConfigForTest([('GSUtil', 'use_gcloud_storage', 'True')]):
      ls_process = subprocess.CompletedProcess(args=[],
                                               returncode=0,
                                               stdout=json.dumps([{
                                                   'url': 'gs://b/dir/',
                                                   'type': 'prefix'
                                               }, {
                                                   'url': 'gs://b/dir/:',
                                                   'type': 'cloud_object'
                                               }, {
                                                   'url': 'gs://b/dir2/',
                                                   'type': 'prefix'
                                               }, {
                                                   'url': 'gs://b/dir2/o',
                                                   'type': 'cloud_object'
                                               }]))
      get_process = subprocess.CompletedProcess(
          args=[], returncode=0, stdout=json.dumps(original_policy))
      set_process = subprocess.CompletedProcess(args=[], returncode=0)
      self._mock_subprocess_run.side_effect = (
          [self._FAKE_CONFIG_GET_ACCOUNT_PROCESS, ls_process] +
          [get_process, set_process] * 3)
      with SetEnvironmentForTest({
          'CLOUDSDK_CORE_PASS_CREDENTIALS_TO_GSUTIL': 'True',
          'CLOUDSDK_ROOT_DIR': 'fake_dir',
      }):
        self.RunCommand(
            'iam',
            ['ch', '-r', 'allAuthenticatedUsers:modified-role', 'gs://b'])

      self.assertEqual(self._mock_subprocess_run.call_args_list, [
          self._MOCK_CONFIG_GET_ACCOUNT_CALL,
          self._get_run_call([
              shim_util._get_gcloud_binary_path('fake_dir'), 'storage', 'ls',
              '--json', '-r', 'gs://b/'
          ]),
          self._get_run_call([
              shim_util._get_gcloud_binary_path('fake_dir'), 'storage',
              'objects', 'get-iam-policy', 'gs://b/dir/:', '--format=json'
          ]),
          self._get_run_call([
              shim_util._get_gcloud_binary_path('fake_dir'),
              'storage',
              'objects',
              'set-iam-policy',
              'gs://b/dir/:',
              '-',
          ],
                             stdin=mock.ANY),
          self._get_run_call([
              shim_util._get_gcloud_binary_path('fake_dir'), 'storage',
              'objects', 'get-iam-policy', 'gs://b/dir2/o', '--format=json'
          ]),
          self._get_run_call([
              shim_util._get_gcloud_binary_path('fake_dir'),
              'storage',
              'objects',
              'set-iam-policy',
              'gs://b/dir2/o',
              '-',
          ],
                             stdin=mock.ANY)
      ])

  def test_iam_ch_raises_ls_error(self):
    with SetBotoConfigForTest([('GSUtil', 'use_gcloud_storage', 'True')]):
      get_account_process = subprocess.CompletedProcess(
          args=[], returncode=0, stdout='fake_account@gmail.com')
      ls_process = subprocess.CompletedProcess(args=[],
                                               returncode=1,
                                               stderr='An error.')
      self._mock_subprocess_run.side_effect = [get_account_process, ls_process]

      with SetEnvironmentForTest({
          'CLOUDSDK_CORE_PASS_CREDENTIALS_TO_GSUTIL': 'True',
          'CLOUDSDK_ROOT_DIR': 'fake_dir',
      }):
        with self.assertRaisesRegex(CommandException, 'An error.'):
          self.RunCommand(
              'iam', ['ch', 'allAuthenticatedUsers:modified-role', 'gs://b/o'])
        self.assertEqual(self._mock_subprocess_run.call_count, 2)

  def test_iam_ch_raises_get_error(self):
    with SetBotoConfigForTest([('GSUtil', 'use_gcloud_storage', 'True')]):
      get_account_process = subprocess.CompletedProcess(
          args=[], returncode=0, stdout='fake_account@gmail.com')
      ls_process = subprocess.CompletedProcess(args=[],
                                               returncode=0,
                                               stdout=json.dumps([{
                                                   'url': 'gs://b/o',
                                                   'type': 'cloud_object'
                                               }]))
      get_process = subprocess.CompletedProcess(args=[],
                                                returncode=1,
                                                stderr='An error.')
      self._mock_subprocess_run.side_effect = [
          get_account_process, ls_process, get_process
      ]

      with SetEnvironmentForTest({
          'CLOUDSDK_CORE_PASS_CREDENTIALS_TO_GSUTIL': 'True',
          'CLOUDSDK_ROOT_DIR': 'fake_dir',
      }):
        with self.assertRaisesRegex(CommandException, 'An error.'):
          self.RunCommand(
              'iam', ['ch', 'allAuthenticatedUsers:modified-role', 'gs://b/o'])
        self.assertEqual(self._mock_subprocess_run.call_count, 3)

  def test_iam_ch_raises_set_error(self):
    with SetBotoConfigForTest([('GSUtil', 'use_gcloud_storage', 'True')]):
      get_account_process = subprocess.CompletedProcess(
          args=[], returncode=0, stdout='fake_account@gmail.com')
      ls_process = subprocess.CompletedProcess(args=[],
                                               returncode=0,
                                               stdout=json.dumps([{
                                                   'url': 'gs://b/o',
                                                   'type': 'cloud_object'
                                               }]))
      get_process = subprocess.CompletedProcess(args=[],
                                                returncode=0,
                                                stdout='{"bindings": []}')
      set_process = subprocess.CompletedProcess(args=[],
                                                returncode=1,
                                                stderr='An error.')
      self._mock_subprocess_run.side_effect = [
          get_account_process, ls_process, get_process, set_process
      ]

      with SetEnvironmentForTest({
          'CLOUDSDK_CORE_PASS_CREDENTIALS_TO_GSUTIL': 'True',
          'CLOUDSDK_ROOT_DIR': 'fake_dir',
      }):
        with self.assertRaisesRegex(CommandException, 'An error.'):
          self.RunCommand(
              'iam', ['ch', 'allAuthenticatedUsers:modified-role', 'gs://b/o'])
        self.assertEqual(self._mock_subprocess_run.call_count, 4)

  def test_iam_ch_continues_on_ls_error(self):
    original_policy = {
        'bindings': [{
            'role': 'roles/storage.modified-role',
            'members': ['allUsers'],
        }]
    }
    new_policy = {
        'bindings': [{
            'role': 'roles/storage.modified-role',
            'members': ['allAuthenticatedUsers', 'allUsers'],
        }]
    }
    with SetBotoConfigForTest([('GSUtil', 'use_gcloud_storage', 'True')]):
      ls_process = subprocess.CompletedProcess(args=[],
                                               returncode=1,
                                               stderr='An error.')
      ls_process2 = subprocess.CompletedProcess(args=[],
                                                returncode=1,
                                                stderr='Another error.')
      self._mock_subprocess_run.side_effect = [
          self._FAKE_CONFIG_GET_ACCOUNT_PROCESS, ls_process, ls_process2
      ]

      with SetEnvironmentForTest({
          'CLOUDSDK_CORE_PASS_CREDENTIALS_TO_GSUTIL': 'True',
          'CLOUDSDK_ROOT_DIR': 'fake_dir',
      }):
        mock_log_handler = self.RunCommand('iam', [
            'ch',
            '-f',
            'allAuthenticatedUsers:modified-role',
            'gs://b/o1',
            'gs://b/o2',
        ],
                                           debug=1,
                                           return_log_handler=True)

      self.assertEqual(self._mock_subprocess_run.call_args_list, [
          self._MOCK_CONFIG_GET_ACCOUNT_CALL,
          self._get_run_call([
              shim_util._get_gcloud_binary_path('fake_dir'), 'storage', 'ls',
              '--json', 'gs://b/o1'
          ]),
          self._get_run_call([
              shim_util._get_gcloud_binary_path('fake_dir'), 'storage', 'ls',
              '--json', 'gs://b/o2'
          ]),
      ])

      error_lines = '\n'.join(mock_log_handler.messages['error'])
      self.assertIn('An error.', error_lines)
      self.assertIn('Another error.', error_lines)

  def test_iam_ch_continues_on_get_error(self):
    original_policy = {
        'bindings': [{
            'role': 'roles/storage.modified-role',
            'members': ['allUsers'],
        }]
    }
    new_policy = {
        'bindings': [{
            'role': 'roles/storage.modified-role',
            'members': ['allAuthenticatedUsers', 'allUsers'],
        }]
    }
    with SetBotoConfigForTest([('GSUtil', 'use_gcloud_storage', 'True')]):
      ls_process = subprocess.CompletedProcess(args=[],
                                               returncode=0,
                                               stdout=json.dumps([{
                                                   'url': 'gs://b/o1',
                                                   'type': 'cloud_object'
                                               }]))
      get_process = subprocess.CompletedProcess(args=[],
                                                returncode=1,
                                                stderr='An error.')
      ls_process2 = subprocess.CompletedProcess(args=[],
                                                returncode=1,
                                                stderr='Another error.')
      self._mock_subprocess_run.side_effect = [
          self._FAKE_CONFIG_GET_ACCOUNT_PROCESS, ls_process, get_process,
          ls_process2
      ]
      with SetEnvironmentForTest({
          'CLOUDSDK_CORE_PASS_CREDENTIALS_TO_GSUTIL': 'True',
          'CLOUDSDK_ROOT_DIR': 'fake_dir',
      }):
        mock_log_handler = self.RunCommand('iam', [
            'ch',
            '-f',
            'allAuthenticatedUsers:modified-role',
            'gs://b/o1',
            'gs://b/o2',
        ],
                                           debug=1,
                                           return_log_handler=True)

      self.assertEqual(self._mock_subprocess_run.call_args_list, [
          self._MOCK_CONFIG_GET_ACCOUNT_CALL,
          self._get_run_call([
              shim_util._get_gcloud_binary_path('fake_dir'), 'storage', 'ls',
              '--json', 'gs://b/o1'
          ]),
          self._get_run_call([
              shim_util._get_gcloud_binary_path('fake_dir'), 'storage',
              'objects', 'get-iam-policy', 'gs://b/o1', '--format=json'
          ]),
          self._get_run_call([
              shim_util._get_gcloud_binary_path('fake_dir'), 'storage', 'ls',
              '--json', 'gs://b/o2'
          ]),
      ])

      error_lines = '\n'.join(mock_log_handler.messages['error'])
      self.assertIn('An error.', error_lines)
      self.assertIn('Another error.', error_lines)

  def test_iam_ch_continues_on_set_error(self):
    original_policy = {
        'bindings': [{
            'role': 'roles/storage.modified-role',
            'members': ['allUsers'],
        }]
    }
    new_policy = {
        'bindings': [{
            'role': 'roles/storage.modified-role',
            'members': ['allAuthenticatedUsers', 'allUsers'],
        }]
    }
    with SetBotoConfigForTest([('GSUtil', 'use_gcloud_storage', 'True')]):
      ls_process = subprocess.CompletedProcess(args=[],
                                               returncode=0,
                                               stdout=json.dumps([{
                                                   'url': 'gs://b/o1',
                                                   'type': 'cloud_object'
                                               }]))
      get_process = subprocess.CompletedProcess(
          args=[], returncode=0, stdout=json.dumps(original_policy))
      set_process = subprocess.CompletedProcess(args=[],
                                                returncode=1,
                                                stderr='An error.')
      ls_process2 = subprocess.CompletedProcess(args=[],
                                                returncode=1,
                                                stderr='Another error.')
      self._mock_subprocess_run.side_effect = [
          self._FAKE_CONFIG_GET_ACCOUNT_PROCESS, ls_process, get_process,
          set_process, ls_process2
      ]

      with SetEnvironmentForTest({
          'CLOUDSDK_CORE_PASS_CREDENTIALS_TO_GSUTIL': 'True',
          'CLOUDSDK_ROOT_DIR': 'fake_dir',
      }):
        mock_log_handler = self.RunCommand('iam', [
            'ch', '-f', 'allAuthenticatedUsers:modified-role', 'gs://b/o1',
            'gs://b/o2'
        ],
                                           debug=1,
                                           return_log_handler=True)

      self.assertEqual(self._mock_subprocess_run.call_args_list, [
          self._MOCK_CONFIG_GET_ACCOUNT_CALL,
          self._get_run_call([
              shim_util._get_gcloud_binary_path('fake_dir'), 'storage', 'ls',
              '--json', 'gs://b/o1'
          ]),
          self._get_run_call([
              shim_util._get_gcloud_binary_path('fake_dir'), 'storage',
              'objects', 'get-iam-policy', 'gs://b/o1', '--format=json'
          ]),
          self._get_run_call([
              shim_util._get_gcloud_binary_path('fake_dir'),
              'storage',
              'objects',
              'set-iam-policy',
              'gs://b/o1',
              '-',
          ],
                             stdin=json.dumps(new_policy, sort_keys=True)),
          self._get_run_call([
              shim_util._get_gcloud_binary_path('fake_dir'), 'storage', 'ls',
              '--json', 'gs://b/o2'
          ]),
      ])

      error_lines = '\n'.join(mock_log_handler.messages['error'])
      self.assertIn('An error.', error_lines)
      self.assertIn('Another error.', error_lines)

Youez - 2016 - github.com/yon3zu
LinuXploit